refaturing message system, step 2: preparing new database-backed queue
Signed-off-by: Stephan Richter <s.richter@srsoftware.de>
This commit is contained in:
@@ -0,0 +1,15 @@
|
||||
package de.srsoftware.umbrella.message;
|
||||
|
||||
import de.srsoftware.umbrella.core.model.UmbrellaUser;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
public interface MessageQueue<T> {
|
||||
public Stream<T> getEnvelopes();
|
||||
public Stream<T> getEnvelopesFor(UmbrellaUser user);
|
||||
public Optional<T> getEnvelope(int hash);
|
||||
public void markRead(int hash, UmbrellaUser user);
|
||||
public void push(T message);
|
||||
}
|
||||
@@ -79,7 +79,7 @@ public class MessageSystem extends BaseHandler implements PostBox, EventListener
|
||||
private final int port;
|
||||
private final SqliteMessageDb db;
|
||||
private Session session;
|
||||
private final List<Envelope> queue = new CopyOnWriteArrayList<>();
|
||||
private final List<Envelope<?>> queue = new CopyOnWriteArrayList<>();
|
||||
private String debugAddress;
|
||||
private final HashMap<Receiver,List<Exception>> exceptions = new HashMap<>();
|
||||
|
||||
@@ -140,7 +140,7 @@ public class MessageSystem extends BaseHandler implements PostBox, EventListener
|
||||
var head = path.pop();
|
||||
return switch (head){
|
||||
case SETTINGS -> patchSettings(ex,user.get());
|
||||
case READ -> patchState(ex,user.get(),path.pop());
|
||||
case READ -> markRead(ex,user.get(),path.pop());
|
||||
default -> super.doGet(path,ex);
|
||||
};
|
||||
} catch (NumberFormatException e){
|
||||
@@ -151,6 +151,8 @@ public class MessageSystem extends BaseHandler implements PostBox, EventListener
|
||||
}
|
||||
|
||||
private boolean getMessage(HttpExchange ex, UmbrellaUser user, int hash) throws IOException {
|
||||
var envel = db.getEnvelope(hash).filter(env -> env.isFor(user));
|
||||
if (envel.isPresent()) return sendMessage(ex, user, envel.get());
|
||||
var envelope = queue.stream()
|
||||
.filter(msg -> msg.isFor(user))
|
||||
.filter(msg -> msg.hashCode() == hash)
|
||||
@@ -164,6 +166,8 @@ public class MessageSystem extends BaseHandler implements PostBox, EventListener
|
||||
}
|
||||
|
||||
private boolean listMessages(HttpExchange ex, UmbrellaUser user) throws IOException {
|
||||
var msgs = db.getEnvelopesFor(user).map(e -> summary(e, user.language())).toList();
|
||||
if (!msgs.isEmpty()) return sendContent(ex,msgs);
|
||||
var messages = queue.stream().filter(e -> e.isFor(user)).map(e -> summary(e, user.language())).toList();
|
||||
return sendContent(ex,messages);
|
||||
}
|
||||
@@ -173,7 +177,7 @@ public class MessageSystem extends BaseHandler implements PostBox, EventListener
|
||||
for (var user : event.audience()){
|
||||
if (debugAddress != null && !debugAddress.equals(user.email().toString())) continue;
|
||||
var message = new TranslatableMessage(event.initiator(),event.subject(),event.describe(),null);
|
||||
var envelope = new Envelope(message,user);
|
||||
var envelope = new Envelope<>(message,user);
|
||||
send(envelope);
|
||||
}
|
||||
}
|
||||
@@ -192,9 +196,10 @@ public class MessageSystem extends BaseHandler implements PostBox, EventListener
|
||||
return sendContent(ex,db.update(user,settings));
|
||||
}
|
||||
|
||||
private boolean patchState(HttpExchange ex, UmbrellaUser user, String path) {
|
||||
private boolean markRead(HttpExchange ex, UmbrellaUser user, String path) {
|
||||
try {
|
||||
var hash = Integer.parseInt(path);
|
||||
db.markRead(hash, user);
|
||||
var envelope = queue.stream().filter(env -> env.hashCode() == hash).findFirst().orElse(null);
|
||||
if (envelope != null){
|
||||
envelope.receivers().remove(user);
|
||||
@@ -306,8 +311,9 @@ public class MessageSystem extends BaseHandler implements PostBox, EventListener
|
||||
}
|
||||
|
||||
@Override
|
||||
public void send(Envelope envelope) {
|
||||
public void send(Envelope<?> envelope) {
|
||||
queue.add(envelope);
|
||||
db.push(envelope);
|
||||
new Thread(() -> processMessages(null)).start();
|
||||
}
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@ package de.srsoftware.umbrella.message;
|
||||
import static de.srsoftware.tools.jdbc.Condition.equal;
|
||||
import static de.srsoftware.tools.jdbc.Query.*;
|
||||
import static de.srsoftware.umbrella.core.Errors.*;
|
||||
import static de.srsoftware.umbrella.core.ResponseCode.HTTP_SERVER_ERROR;
|
||||
import static de.srsoftware.umbrella.core.constants.Constants.TABLE_SETTINGS;
|
||||
import static de.srsoftware.umbrella.core.constants.Field.*;
|
||||
import static de.srsoftware.umbrella.core.exceptions.UmbrellaException.*;
|
||||
@@ -13,6 +14,8 @@ import static java.text.MessageFormat.format;
|
||||
|
||||
import de.srsoftware.umbrella.core.constants.Text;
|
||||
import de.srsoftware.umbrella.core.exceptions.UmbrellaException;
|
||||
import de.srsoftware.umbrella.core.model.Envelope;
|
||||
import de.srsoftware.umbrella.core.model.TranslatedMessage;
|
||||
import de.srsoftware.umbrella.core.model.UmbrellaUser;
|
||||
import de.srsoftware.umbrella.message.model.Instantly;
|
||||
import de.srsoftware.umbrella.message.model.Settings;
|
||||
@@ -21,8 +24,11 @@ import java.sql.Connection;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
public class SqliteMessageDb implements MessageDb{
|
||||
public class SqliteMessageDb implements MessageDb, MessageQueue<Envelope<TranslatedMessage>> {
|
||||
private static final System.Logger LOG = System.getLogger(SqliteMessageDb.class.getSimpleName());
|
||||
private final Connection db;
|
||||
private static final String DB_VERSION = "message_db_version";
|
||||
@@ -80,6 +86,21 @@ CREATE TABLE IF NOT EXISTS {0} ( {1} VARCHAR(255) PRIMARY KEY, {2} VARCHAR(255)
|
||||
return createSettingsTable();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<Envelope<TranslatedMessage>> getEnvelope(int hash) {
|
||||
throw new UmbrellaException(HTTP_SERVER_ERROR,"{class}.getEnvelope({hash}) not implemented!","class",getClass().getSimpleName(),"hash",hash); // TODO
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream<Envelope<TranslatedMessage>> getEnvelopes() {
|
||||
throw new UmbrellaException(HTTP_SERVER_ERROR,"{class}.getMessages() not implemented!","class",getClass().getSimpleName()); // TODO
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream<Envelope<TranslatedMessage>> getEnvelopesFor(UmbrellaUser user) {
|
||||
throw new UmbrellaException(HTTP_SERVER_ERROR,"{class}.getEnvelopesFor({user}) not implemented!","class",getClass().getSimpleName(),"user",user.name()); // TODO
|
||||
}
|
||||
|
||||
@Override
|
||||
public Settings getSettings(UmbrellaUser user) throws UmbrellaException {
|
||||
try {
|
||||
@@ -98,6 +119,17 @@ CREATE TABLE IF NOT EXISTS {0} ( {1} VARCHAR(255) PRIMARY KEY, {2} VARCHAR(255)
|
||||
var version = createTables();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void markRead(int hash, UmbrellaUser user) {
|
||||
throw new UmbrellaException(HTTP_SERVER_ERROR,"{class}.markRead(hash, user) not implemented!","class",getClass().getSimpleName()); // TODO
|
||||
// TODO: throw exception if message not found!
|
||||
}
|
||||
|
||||
@Override
|
||||
public void push(Envelope<TranslatedMessage> message) {
|
||||
throw new UmbrellaException(HTTP_SERVER_ERROR,"{class}.push(message) not implemented!","class",getClass().getSimpleName()); // TODO
|
||||
}
|
||||
|
||||
private Settings toSettings(ResultSet rs) throws SQLException {
|
||||
var submission = rs.getString(VALUE);
|
||||
if (submission.trim().equalsIgnoreCase(INSTANTLY)) return new Instantly();
|
||||
|
||||
Reference in New Issue
Block a user