working on implementatin of MessageQueue by SqliteMessageDb

Signed-off-by: Stephan Richter <s.richter@srsoftware.de>
This commit is contained in:
2026-01-26 08:50:01 +01:00
parent 94f41be391
commit c12786971f
9 changed files with 68 additions and 79 deletions

View File

@@ -1,59 +0,0 @@
/* © SRSoftware 2025 */
package de.srsoftware.umbrella.message;
import static java.lang.System.Logger.Level.DEBUG;
import de.srsoftware.umbrella.core.model.Envelope;
import de.srsoftware.umbrella.core.model.TranslatedMessage;
import de.srsoftware.umbrella.core.model.User;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class InMemoryQueue implements MessageQueue<TranslatedMessage> {
public static final System.Logger LOG = System.getLogger(InMemoryQueue.class.getSimpleName());
private LinkedList<Envelope<TranslatedMessage>> queue = new LinkedList<>();
private Stream<Envelope<TranslatedMessage>> envelopes(){
return queue.stream();
}
@Override
public List<Envelope<TranslatedMessage>> getEnvelopesFor(User user) {
return envelopes().filter(env -> env.isFor(user)).toList();
}
@Override
public Optional<Envelope<TranslatedMessage>> getEnvelope(int hash) {
return envelopes().filter(env -> env.hashCode() == hash).findAny();
}
@Override
public Stream<User> getReceivers() {
return envelopes().map(Envelope::receivers).flatMap(Set::stream).distinct();
}
@Override
public Optional<Envelope<TranslatedMessage>> markRead(int hash, User user) {
for (var env : queue){
if (env.hashCode() == hash) {
LOG.log(DEBUG,"Removing {0} from receiver list of {1}…",user.name(),env.message().subject());
env.receivers().remove(user);
if (env.receivers().isEmpty()) {
LOG.log(DEBUG,"No more due receiers, removing {0} from queue…",env.message().subject());
queue.remove(env);
}
return Optional.of(env);
}
}
return Optional.empty();
}
@Override
public void push(Envelope<TranslatedMessage> envelope) {
queue.add(envelope);
LOG.log(DEBUG,"{0} for {1} pushed to queue",envelope.message().subject(),envelope.receivers().stream().map(User::name).collect(Collectors.joining(", ")));
}
}

View File

@@ -10,8 +10,8 @@ import java.util.stream.Stream;
public interface MessageQueue<T extends Message<?>> {
public List<Envelope<T>> getEnvelopesFor(User user);
public Optional<Envelope<T>> getEnvelope(int hash);
public Optional<Envelope<T>> getEnvelope(long id);
Stream<User> getReceivers();
public Optional<Envelope<T>> markRead(int hash, User user);
public Optional<Envelope<T>> markRead(long messageId, User user);
public void push(Envelope<T> envelope);
}

View File

@@ -170,7 +170,7 @@ public class MessageSystem extends BaseHandler implements PostBox, EventListener
@Override
public void onEvent(Event<?> event) {
var message = new TranslatableMessage(event.initiator(),event.subject(),event.describe(),null);
send(new Envelope<>(message,event.audience()));
send(new Envelope<>(0,message,event.audience()));
}
private boolean patchSettings(HttpExchange ex, UmbrellaUser user) throws IOException {
@@ -295,7 +295,7 @@ public class MessageSystem extends BaseHandler implements PostBox, EventListener
var env = map.get(lang);
if (env == null){
TranslatedMessage translated = tm.translate(lang);
env = new Envelope<>(translated,new HashSet<>());
env = new Envelope<>(0, translated,new HashSet<>());
map.put(lang,env);
}
env.receivers().add(receiver);

View File

@@ -8,6 +8,7 @@ import static de.srsoftware.tools.jdbc.Query.SelectQuery.ALL;
import static de.srsoftware.umbrella.core.Errors.*;
import static de.srsoftware.umbrella.core.ModuleRegistry.userService;
import static de.srsoftware.umbrella.core.ResponseCode.HTTP_SERVER_ERROR;
import static de.srsoftware.umbrella.core.constants.Constants.COUNT;
import static de.srsoftware.umbrella.core.constants.Field.*;
import static de.srsoftware.umbrella.core.exceptions.UmbrellaException.*;
import static de.srsoftware.umbrella.core.model.Translatable.t;
@@ -16,6 +17,7 @@ import static de.srsoftware.umbrella.message.model.Schedule.schedule;
import static java.text.MessageFormat.format;
import static java.time.ZoneOffset.UTC;
import de.srsoftware.tools.jdbc.Query;
import de.srsoftware.umbrella.core.BaseDb;
import de.srsoftware.umbrella.core.constants.Text;
import de.srsoftware.umbrella.core.exceptions.UmbrellaException;
@@ -110,8 +112,32 @@ CREATE TABLE IF NOT EXISTS {0} ( {1} INTEGER PRIMARY KEY, {2} VARCHAR(255) NOT N
}
@Override
public Optional<Envelope<TranslatedMessage>> getEnvelope(int hash) {
throw new UmbrellaException(HTTP_SERVER_ERROR,"{class}.getEnvelope({hash}) not implemented!","class",getClass().getSimpleName(),"hash",hash); // TODO
public Optional<Envelope<TranslatedMessage>> getEnvelope(long messageId) {
try {
var attachments = new ArrayList<Attachment>();
var rs = select(ALL).from(TABLE_ATTACHMENTS).where(MESSAGE_ID,equal(messageId)).exec(db);
while (rs.next()) attachments.add(new Attachment(rs.getString(NAME),rs.getString(MIME),rs.getBytes(DATA)));
rs.close();
var receivers = new ArrayList<User>();
rs = select(ALL).from(TABLE_ATTACHMENTS).where(MESSAGE_ID,equal(messageId)).exec(db);
while (rs.next()) receivers.add(new User(rs.getString(NAME),new EmailAddress(rs.getString(EMAIL)),null));
rs.close();
Envelope<TranslatedMessage> envelope = null;
rs = select(ALL).from(TABLE_MESSAGES).where(ID,equal(messageId)).exec(db);
if (rs.next()){
var sender = userService().loadUser(rs.getLong(SENDER_USER_ID));
var msg = new TranslatedMessage(sender,rs.getString(SUBJECT),rs.getString(BODY), attachments);
envelope = new Envelope<>(messageId, msg, receivers);
}
rs.close();
if (envelope != null) return Optional.of(envelope);
return Optional.empty();
} catch (SQLException e) {
throw failedToLoadObject(Text.MESSAGE,id);
}
throw new UmbrellaException(HTTP_SERVER_ERROR,"{class}.getEnvelope({id}) not implemented!","class",getClass().getSimpleName(), ID ,id); // TODO
}
@Override
@@ -137,7 +163,7 @@ CREATE TABLE IF NOT EXISTS {0} ( {1} INTEGER PRIMARY KEY, {2} VARCHAR(255) NOT N
var messageId = rs.getLong(ID);
var sender = userService().loadUser(rs.getLong(SENDER_USER_ID));
var msg = new TranslatedMessage(sender,rs.getString(SUBJECT),rs.getString(BODY), attachments.get(messageId));
var envelope = new Envelope<>(msg,user);
var envelope = new Envelope<>(messageId, msg, user);
envelopes.add(envelope);
}
rs.close();
@@ -177,14 +203,23 @@ CREATE TABLE IF NOT EXISTS {0} ( {1} INTEGER PRIMARY KEY, {2} VARCHAR(255) NOT N
}
}
private void init() {
var version = createTables();
}
@Override
public Optional<Envelope<TranslatedMessage>> markRead(int hash, User user) {
throw new UmbrellaException(HTTP_SERVER_ERROR,"{class}.markRead(hash, user) not implemented!","class",getClass().getSimpleName()); // TODO
// TODO: throw exception if message not found!
public Optional<Envelope<TranslatedMessage>> markRead(long messageId, User user) {
try {
Query.delete().from(TABLE_RECEIVERS).where(MESSAGE_ID,equal(messageId)).where(EMAIL,equal(user.email().toString())).execute(db);
var rs = select(COUNT).from(TABLE_RECEIVERS).where(MESSAGE_ID,equal(messageId)).exec(db);
Integer count = null;
if (rs.next()) count = rs.getInt(1);
rs.close();
if (count != null && count == 0){
delete().from(TABLE_ATTACHMENTS).where(MESSAGE_ID,equal(messageId)).execute(db);
delete().from(TABLE_MESSAGES).where(ID,equal(messageId)).execute(db);
}
// TODO load message or fail
} catch (SQLException e) {
throw failedToDropObject(Text.RECEIVER);
}
}
@Override