Ich arbeite an einfachen Chat-Modul für meine Anwendung mit Spring WebFlux mit ReactiveMongoRepository auf Back-End und Angular 4 auf der Vorderseite. Ich bin in der Lage, Daten über WebSocketSession zu empfangen, aber nach dem Streaming aller Nachrichten von db möchte ich die Verbindung behalten, damit ich die Nachrichtenliste aktualisieren konnte. Kann mir jemand Hinweise geben, wie ich das erreichen kann, oder vielleicht folge ich falschen Annahmen?Spring WebFlux reaktive WebSocket verhindern Verbindung schließen
Java Backend verantwortlich für WebSocket, mein Teilnehmer meldet sich nur aktuelle Zustand, nichts relevant dort:
WebFluxConfiguration:
@Configuration
@EnableWebFlux
public class WebSocketConfig {
private final WebSocketHandler webSocketHandler;
@Autowired
public WebSocketConfig(WebSocketHandler webSocketHandler) {
this.webSocketHandler = webSocketHandler;
}
@Bean
@Primary
public HandlerMapping webSocketMapping() {
Map<String, Object> map = new HashMap<>();
map.put("/websocket-messages", webSocketHandler);
SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
mapping.setOrder(10);
mapping.setUrlMap(map);
return mapping;
}
@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter();
}
}
WebSocketHandler Implementierung
@Component
public class MessageWebSocketHandler implements WebSocketHandler {
private final MessageRepository messageRepository;
private ObjectMapper mapper = new ObjectMapper();
private MessageSubscriber subscriber = new MessageSubscriber();
@Autowired
public MessageWebSocketHandler(MessageRepository messageRepository) {
this.messageRepository = messageRepository;
}
@Override
public Mono<Void> handle(WebSocketSession session) {
session.receive()
.map(WebSocketMessage::getPayloadAsText)
.map(this::toMessage)
.subscribe(subscriber::onNext, subscriber:: onError, subscriber::onComplete);
return session.send(
messageRepository.findAll()
.map(this::toJSON)
.map(session::textMessage));
}
private String toJSON(Message message) {
try {
return mapper.writeValueAsString(message);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
private Message toMessage(String json) {
try {
return mapper.readValue(json, Message.class);
} catch (IOException e) {
throw new RuntimeException("Invalid JSON:" + json, e);
}
}
}
und MongoRepo
@Repository
public interface MessageRepository extends
ReactiveMongoRepository<Message,String> {
}
Handhabung
FrontEnd:
@Injectable()
export class WebSocketService {
private subject: Rx.Subject<MessageEvent>;
constructor() {
}
public connect(url): Rx.Subject<MessageEvent> {
if (!this.subject) {
this.subject = this.create(url);
console.log('Successfully connected: ' + url);
}
return this.subject;
}
private create(url): Rx.Subject<MessageEvent> {
const ws = new WebSocket(url);
const observable = Rx.Observable.create(
(obs: Rx.Observer<MessageEvent>) => {
ws.onmessage = obs.next.bind(obs);
ws.onerror = obs.error.bind(obs);
ws.onclose = obs.complete.bind(obs);
return ws.close.bind(ws);
});
const observer = {
next: (data: Object) => {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify(data));
}
}
};
return Rx.Subject.create(observer, observable);
}
}
in anderem Dienst Ich bin Abbilden beobachtbaren von Antwort auf meine Art
constructor(private wsService: WebSocketService) {
this.messages = <Subject<MessageEntity>>this.wsService
.connect('ws://localhost:8081/websocket-messages')
.map((response: MessageEvent): MessageEntity => {
const data = JSON.parse(response.data);
return new MessageEntity(data.id, data.user_id, data.username, data.message, data.links);
});
}
und schließlich Abo mit Sendefunktion, die ich wegen der geschlossenen Verbindung nicht verwenden können:
ngOnInit() {
this.messages = [];
this._ws_subscription = this.chatService.messages.subscribe(
(message: MessageEntity) => {
this.messages.push(message);
},
error2 => {
console.log(error2.json());
},
() => {
console.log('Closed');
}
);
}
sendTestMessage() {
this.chatService.messages.next(new MessageEntity(null, '59ca30ac87e77d0f38237739', 'mickl', 'test message angular', null));
}
Ok, also im Grunde nur WebFlux verwenden ist es unmöglich, die Verbindung für Benachrichtigungszwecke zu halten, oder? Es ist ein Forschungsprojekt für mich, das ist ein bisschen enttäuschend, danke für die Alternative. Ich werde es auf jeden Fall überprüfen. – MiCkl
Das habe ich nicht gesagt. Sie können die Verbindung geöffnet lassen, solange die Quelle 'Flux' geöffnet bleibt. In Ihrem Fall sendet das Repository ein "onComplete" -Signal, wenn es mit dem Streaming von Einträgen fertig ist. Wie sonst würden Sie wissen, ob es mehr Ergebnisse aus der Datenbank gibt oder nicht? Sie suchen hier nach einem unendlichen Stream-Anwendungsfall, und Sie benötigen eine Quelle, die sich so verhält. –
Oh ok ich bekomme es jetzt, ich habe mir die Dokumentation angeschaut und @Tailable Annotation scheint die Antwort zu sein, nach der ich gesucht habe. Danke nochmal, jetzt kann ich weitermachen; D – MiCkl