started SQLite implementation of MessageQueue<TranslatedMessage>:

- push(Envelope) implemented
- getEnvelopesFor(User) implemented
- getReceivers() implemented

next things to implement:
- markRead(hash, user)
- getEnvelope(hash)

Signed-off-by: Stephan Richter <s.richter@srsoftware.de>
This commit is contained in:
2026-01-26 00:24:15 +01:00
parent fc1c735036
commit 94f41be391
4 changed files with 54 additions and 25 deletions

View File

@@ -22,6 +22,8 @@ public class Text {
public static final String DOCUMENT_TYPE_ID = "document type id"; public static final String DOCUMENT_TYPE_ID = "document type id";
public static final String DOCUMENT_WITH_ID = "document ({id})"; public static final String DOCUMENT_WITH_ID = "document ({id})";
public static final String EMAILS_FOR_RECEIVER = "emails for {email}";
public static final String INVALID_DB_CODE = "Encountered invalid dbCode: {code}"; public static final String INVALID_DB_CODE = "Encountered invalid dbCode: {code}";
public static final String ITEM = "item"; public static final String ITEM = "item";
public static final String ITEMS = "items"; public static final String ITEMS = "items";
@@ -41,6 +43,8 @@ public class Text {
public static final String PROPERTIES = "properties"; public static final String PROPERTIES = "properties";
public static final String PROPERTY = "property"; public static final String PROPERTY = "property";
public static final String RECEIVERS = "receivers";
public static final String SENDER = "sender"; public static final String SENDER = "sender";
public static final String SERVICE_WITH_ID = "service ({id})"; public static final String SERVICE_WITH_ID = "service ({id})";
public static final String SESSION = "session"; public static final String SESSION = "session";

View File

@@ -16,24 +16,23 @@ public class InMemoryQueue implements MessageQueue<TranslatedMessage> {
public static final System.Logger LOG = System.getLogger(InMemoryQueue.class.getSimpleName()); public static final System.Logger LOG = System.getLogger(InMemoryQueue.class.getSimpleName());
private LinkedList<Envelope<TranslatedMessage>> queue = new LinkedList<>(); private LinkedList<Envelope<TranslatedMessage>> queue = new LinkedList<>();
@Override private Stream<Envelope<TranslatedMessage>> envelopes(){
public Stream<Envelope<TranslatedMessage>> getEnvelopes() {
return queue.stream(); return queue.stream();
} }
@Override @Override
public List<Envelope<TranslatedMessage>> getEnvelopesFor(User user) { public List<Envelope<TranslatedMessage>> getEnvelopesFor(User user) {
return getEnvelopes().filter(env -> env.isFor(user)).toList(); return envelopes().filter(env -> env.isFor(user)).toList();
} }
@Override @Override
public Optional<Envelope<TranslatedMessage>> getEnvelope(int hash) { public Optional<Envelope<TranslatedMessage>> getEnvelope(int hash) {
return getEnvelopes().filter(env -> env.hashCode() == hash).findAny(); return envelopes().filter(env -> env.hashCode() == hash).findAny();
} }
@Override @Override
public Stream<User> getReceivers() { public Stream<User> getReceivers() {
return getEnvelopes().map(Envelope::receivers).flatMap(Set::stream).distinct(); return envelopes().map(Envelope::receivers).flatMap(Set::stream).distinct();
} }
@Override @Override

View File

@@ -9,7 +9,6 @@ import java.util.Optional;
import java.util.stream.Stream; import java.util.stream.Stream;
public interface MessageQueue<T extends Message<?>> { public interface MessageQueue<T extends Message<?>> {
public Stream<Envelope<T>> getEnvelopes();
public List<Envelope<T>> getEnvelopesFor(User user); public List<Envelope<T>> getEnvelopesFor(User user);
public Optional<Envelope<T>> getEnvelope(int hash); public Optional<Envelope<T>> getEnvelope(int hash);
Stream<User> getReceivers(); Stream<User> getReceivers();

View File

@@ -2,10 +2,12 @@
package de.srsoftware.umbrella.message; package de.srsoftware.umbrella.message;
import static de.srsoftware.tools.jdbc.Condition.equal; import static de.srsoftware.tools.jdbc.Condition.equal;
import static de.srsoftware.tools.jdbc.Condition.in;
import static de.srsoftware.tools.jdbc.Query.*; import static de.srsoftware.tools.jdbc.Query.*;
import static de.srsoftware.tools.jdbc.Query.SelectQuery.ALL;
import static de.srsoftware.umbrella.core.Errors.*; import static de.srsoftware.umbrella.core.Errors.*;
import static de.srsoftware.umbrella.core.ModuleRegistry.userService;
import static de.srsoftware.umbrella.core.ResponseCode.HTTP_SERVER_ERROR; import static de.srsoftware.umbrella.core.ResponseCode.HTTP_SERVER_ERROR;
import static de.srsoftware.umbrella.core.constants.Constants.TABLE_SETTINGS;
import static de.srsoftware.umbrella.core.constants.Field.*; import static de.srsoftware.umbrella.core.constants.Field.*;
import static de.srsoftware.umbrella.core.exceptions.UmbrellaException.*; import static de.srsoftware.umbrella.core.exceptions.UmbrellaException.*;
import static de.srsoftware.umbrella.core.model.Translatable.t; import static de.srsoftware.umbrella.core.model.Translatable.t;
@@ -14,26 +16,17 @@ import static de.srsoftware.umbrella.message.model.Schedule.schedule;
import static java.text.MessageFormat.format; import static java.text.MessageFormat.format;
import static java.time.ZoneOffset.UTC; import static java.time.ZoneOffset.UTC;
import de.srsoftware.tools.jdbc.Query;
import de.srsoftware.umbrella.core.BaseDb; import de.srsoftware.umbrella.core.BaseDb;
import de.srsoftware.umbrella.core.constants.Field;
import de.srsoftware.umbrella.core.constants.Text; import de.srsoftware.umbrella.core.constants.Text;
import de.srsoftware.umbrella.core.exceptions.UmbrellaException; import de.srsoftware.umbrella.core.exceptions.UmbrellaException;
import de.srsoftware.umbrella.core.model.Envelope; import de.srsoftware.umbrella.core.model.*;
import de.srsoftware.umbrella.core.model.TranslatedMessage;
import de.srsoftware.umbrella.core.model.UmbrellaUser;
import de.srsoftware.umbrella.core.model.User;
import de.srsoftware.umbrella.message.model.Instantly; import de.srsoftware.umbrella.message.model.Instantly;
import de.srsoftware.umbrella.message.model.Settings; import de.srsoftware.umbrella.message.model.Settings;
import de.srsoftware.umbrella.message.model.Silent; import de.srsoftware.umbrella.message.model.Silent;
import java.sql.Connection; import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.time.ZoneOffset; import java.util.*;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.stream.Stream; import java.util.stream.Stream;
public class SqliteMessageDb extends BaseDb implements MessageDb, MessageQueue<TranslatedMessage> { public class SqliteMessageDb extends BaseDb implements MessageDb, MessageQueue<TranslatedMessage> {
@@ -121,19 +114,53 @@ CREATE TABLE IF NOT EXISTS {0} ( {1} INTEGER PRIMARY KEY, {2} VARCHAR(255) NOT N
throw new UmbrellaException(HTTP_SERVER_ERROR,"{class}.getEnvelope({hash}) not implemented!","class",getClass().getSimpleName(),"hash",hash); // TODO throw new UmbrellaException(HTTP_SERVER_ERROR,"{class}.getEnvelope({hash}) not implemented!","class",getClass().getSimpleName(),"hash",hash); // TODO
} }
@Override
public Stream<Envelope<TranslatedMessage>> getEnvelopes() {
throw new UmbrellaException(HTTP_SERVER_ERROR,"{class}.getMessages() not implemented!","class",getClass().getSimpleName()); // TODO
}
@Override @Override
public List<Envelope<TranslatedMessage>> getEnvelopesFor(User user) { public List<Envelope<TranslatedMessage>> getEnvelopesFor(User user) {
throw new UmbrellaException(HTTP_SERVER_ERROR,"{class}.getEnvelopesFor({user}) not implemented!","class",getClass().getSimpleName(),"user",user.name()); // TODO try {
var messageIds = new ArrayList<Long>();
var rs = select(MESSAGE_ID).from(TABLE_RECEIVERS).where(EMAIL,equal(user.email().toString())).where(NAME,equal(user.name())).exec(db);
while (rs.next()) messageIds.add(rs.getLong(MESSAGE_ID));
rs.close();
var attachments = new HashMap<Long,Collection<Attachment>>();
rs = select(ALL).from(TABLE_ATTACHMENTS).where(MESSAGE_ID,in(messageIds.toArray())).exec(db);
while (rs.next()){
var attachment = new Attachment(rs.getString(NAME),rs.getString(MIME),rs.getBytes(DATA));
var messageId = rs.getLong(MESSAGE_ID);
attachments.computeIfAbsent(messageId, k -> new ArrayList<>()).add(attachment);
}
rs.close();
var envelopes = new ArrayList<Envelope<TranslatedMessage>>();
rs = select(ALL).from(TABLE_MESSAGES).where(ID,in(messageIds.toArray())).exec(db);
while (rs.next()){
var messageId = rs.getLong(ID);
var sender = userService().loadUser(rs.getLong(SENDER_USER_ID));
var msg = new TranslatedMessage(sender,rs.getString(SUBJECT),rs.getString(BODY), attachments.get(messageId));
var envelope = new Envelope<>(msg,user);
envelopes.add(envelope);
}
rs.close();
return envelopes;
} catch (SQLException e) {
throw failedToLoadObject(t(Text.EMAILS_FOR_RECEIVER,EMAIL,user.email()));
}
} }
@Override @Override
public Stream<User> getReceivers() { public Stream<User> getReceivers() {
return Stream.empty(); try {
var receivers = new ArrayList<User>();
var rs = select(format("DISTINCT {0}, {1}",EMAIL,NAME)).from(TABLE_RECEIVERS).exec(db);
while (rs.next()){
var email = new EmailAddress(rs.getString(EMAIL));
receivers.add(new User(rs.getString(NAME),email,null));
}
rs.close();
return receivers.stream();
} catch (SQLException e) {
throw failedToLoadObject(t(Text.RECEIVERS));
}
} }
@Override @Override