2017-11-23 4 views
1

Ich arbeite an einfachen Chat-Modul für meine Anwendung mit Spring WebFlux mit ReactiveMongoRepository auf Back-End und Angular 4 auf der Vorderseite. Ich bin in der Lage, Daten über WebSocketSession zu empfangen, aber nach dem Streaming aller Nachrichten von db möchte ich die Verbindung behalten, damit ich die Nachrichtenliste aktualisieren konnte. Kann mir jemand Hinweise geben, wie ich das erreichen kann, oder vielleicht folge ich falschen Annahmen?Spring WebFlux reaktive WebSocket verhindern Verbindung schließen

Java Backend verantwortlich für WebSocket, mein Teilnehmer meldet sich nur aktuelle Zustand, nichts relevant dort:

WebFluxConfiguration:

@Configuration 
@EnableWebFlux 
public class WebSocketConfig { 

private final WebSocketHandler webSocketHandler; 

@Autowired 
public WebSocketConfig(WebSocketHandler webSocketHandler) { 
    this.webSocketHandler = webSocketHandler; 
} 

@Bean 
@Primary 
public HandlerMapping webSocketMapping() { 
    Map<String, Object> map = new HashMap<>(); 
    map.put("/websocket-messages", webSocketHandler); 

    SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping(); 
    mapping.setOrder(10); 
    mapping.setUrlMap(map); 
    return mapping; 
} 

@Bean 
public WebSocketHandlerAdapter handlerAdapter() { 
    return new WebSocketHandlerAdapter(); 
} 


} 

WebSocketHandler Implementierung

@Component 
public class MessageWebSocketHandler implements WebSocketHandler { 

private final MessageRepository messageRepository; 
private ObjectMapper mapper = new ObjectMapper(); 
private MessageSubscriber subscriber = new MessageSubscriber(); 

@Autowired 
public MessageWebSocketHandler(MessageRepository messageRepository) { 
    this.messageRepository = messageRepository; 
} 

@Override 
    public Mono<Void> handle(WebSocketSession session) { 
    session.receive() 
      .map(WebSocketMessage::getPayloadAsText) 
      .map(this::toMessage) 
      .subscribe(subscriber::onNext, subscriber:: onError, subscriber::onComplete); 
    return session.send(
      messageRepository.findAll() 
        .map(this::toJSON) 
        .map(session::textMessage)); 
} 

private String toJSON(Message message) { 
    try { 
     return mapper.writeValueAsString(message); 
    } catch (JsonProcessingException e) { 
     throw new RuntimeException(e); 
    } 
} 

private Message toMessage(String json) { 
    try { 
     return mapper.readValue(json, Message.class); 
    } catch (IOException e) { 
     throw new RuntimeException("Invalid JSON:" + json, e); 
    } 
} 
} 

und MongoRepo

@Repository 
public interface MessageRepository extends 
ReactiveMongoRepository<Message,String> { 
} 
Handhabung

FrontEnd:

@Injectable() 
export class WebSocketService { 
    private subject: Rx.Subject<MessageEvent>; 

    constructor() { 
    } 

    public connect(url): Rx.Subject<MessageEvent> { 
    if (!this.subject) { 
     this.subject = this.create(url); 
     console.log('Successfully connected: ' + url); 
    } 
    return this.subject; 
    } 

    private create(url): Rx.Subject<MessageEvent> { 
    const ws = new WebSocket(url); 
    const observable = Rx.Observable.create(
     (obs: Rx.Observer<MessageEvent>) => { 
     ws.onmessage = obs.next.bind(obs); 
     ws.onerror = obs.error.bind(obs); 
     ws.onclose = obs.complete.bind(obs); 
     return ws.close.bind(ws); 
     }); 
    const observer = { 
     next: (data: Object) => { 
     if (ws.readyState === WebSocket.OPEN) { 
      ws.send(JSON.stringify(data)); 
     } 
     } 
    }; 
    return Rx.Subject.create(observer, observable); 
    } 
} 

in anderem Dienst Ich bin Abbilden beobachtbaren von Antwort auf meine Art

constructor(private wsService: WebSocketService) { 
    this.messages = <Subject<MessageEntity>>this.wsService 
     .connect('ws://localhost:8081/websocket-messages') 
     .map((response: MessageEvent): MessageEntity => { 
     const data = JSON.parse(response.data); 
     return new MessageEntity(data.id, data.user_id, data.username, data.message, data.links); 
     }); 
    } 

und schließlich Abo mit Sendefunktion, die ich wegen der geschlossenen Verbindung nicht verwenden können:

ngOnInit() { 
    this.messages = []; 
    this._ws_subscription = this.chatService.messages.subscribe(
     (message: MessageEntity) => { 
     this.messages.push(message); 
     }, 
     error2 => { 
     console.log(error2.json()); 
     }, 
    () => { 
     console.log('Closed'); 
     } 
    ); 
    } 

    sendTestMessage() { 
    this.chatService.messages.next(new MessageEntity(null, '59ca30ac87e77d0f38237739', 'mickl', 'test message angular', null)); 
    } 

Antwort

1

Vorausgesetzt, Ihre Chatnachrichten werden im Datenspeicher gespeichert, während sie empfangen werden, können Sie die ta verwenden iilable Cursors sind in Spring Data MongoDB Reactive enthalten (siehe reference documentation).

So könnte man eine neue Methode auf Ihrem Repository wie erstellen:

public interface MessageRepository extends ReactiveSortingRepository< Message, String> { 

    @Tailable 
    Flux<Message> findWithTailableCursor(); 
} 

Beachten Sie, dass tailable Cursor einige Einschränkungen: Sie Mongo Sammlung gekappt werden muss, und Einträge werden in der Reihenfolge ihrer Einfügung gestreamt.

Die WebFlux-Websocket-Unterstützung von Spring unterstützt STOMP- und Nachrichtenbroker noch nicht, aber dies könnte die bessere Wahl für einen solchen Anwendungsfall sein.

+0

Ok, also im Grunde nur WebFlux verwenden ist es unmöglich, die Verbindung für Benachrichtigungszwecke zu halten, oder? Es ist ein Forschungsprojekt für mich, das ist ein bisschen enttäuschend, danke für die Alternative. Ich werde es auf jeden Fall überprüfen. – MiCkl

+0

Das habe ich nicht gesagt. Sie können die Verbindung geöffnet lassen, solange die Quelle 'Flux' geöffnet bleibt. In Ihrem Fall sendet das Repository ein "onComplete" -Signal, wenn es mit dem Streaming von Einträgen fertig ist. Wie sonst würden Sie wissen, ob es mehr Ergebnisse aus der Datenbank gibt oder nicht? Sie suchen hier nach einem unendlichen Stream-Anwendungsfall, und Sie benötigen eine Quelle, die sich so verhält. –

+0

Oh ok ich bekomme es jetzt, ich habe mir die Dokumentation angeschaut und @Tailable Annotation scheint die Antwort zu sein, nach der ich gesucht habe. Danke nochmal, jetzt kann ich weitermachen; D – MiCkl

Verwandte Themen