2016-04-28 9 views
0

Gibt es eine Möglichkeit, das MySQL-Db zu aktualisieren, ohne die leeren Werte im Stream zu aktualisieren? Wenn mein Eingabedatenstrom einige leere Werte enthält, wird dieser leere Wert derzeit mit dem Wert "data_empty" angegeben. Zu diesem Zeitpunkt aktualisiert CEP den DB mit diesem Wert ("data_empty"). Mein Ziel ist, ohne diesen leeren Wert den Rest der Dinge zu aktualisieren. Ist es möglich, siddhi und WSO2 CEP zu verwenden?Aktualisieren der MySQL-Datenbank mit WSO2 CEP

@Plan:name('DBUpdateExecutionPlan') 
@Import('testStream:1.0.0') 
define stream input (id string, param1 string, param2 string); 

@Export('testOutStream:1.0.0') 
define stream output (id string, param1 string, param2 string); 

@from(eventtable = 'rdbms' , datasource.name = 'MYSQL' , table.name = 'cep') 
define table cepTable (id string, param1 string, param2 string) ; 

from input#window.time(0 sec) 
select * 
update cepTable on id == cepTable.id; 

Antwort

3

Es ist irgendwie schwierig, nur die nicht leeren (nicht ‚data_empty‘ in Ihrem Szenario) Werte in die Datenbank zu aktualisieren. In Siddhi gibt es jedoch eine Funktion namens ifThenElse(condition, value if true, value if false), die in Ihrem Szenario verwendet werden kann. Lesen Sie den folgenden Beispiel-Ausführungsplan, um sich ein Bild von der Verwendung von ifThenElse() und der Aktualisierung von Tabellen zu machen (ähnlich wie in Ihrem Anwendungsfall).

@Plan:name('IfThenElseExecutionPlan') 

@Import('inputStream:1.0.0') 
define stream dataIn (roomId int, roomType string, roomTemp float); 

@Export('outputStream:1.0.0') 
define stream dataOut (roomId int, roomType string, roomTemp float); 

@From(eventtable='rdbms', datasource.name='cepdatabase', table.name='roomTable') 
define table roomTable (roomId int, roomType string, roomTemp float); 

from dataIn[not((roomTable.roomId == roomId) in roomTable)] 
insert into updateStream; 

from dataIn join roomTable 
on roomTable.roomId == dataIn.roomId 
select dataIn.roomId as roomId, 
     ifThenElse(dataIn.roomType=='data_empty', roomTable.roomType, dataIn.roomType) as roomType, 
     ifThenElse(dataIn.roomTemp==0.0f, roomTable.roomTemp, dataIn.roomTemp) as roomTemp 
insert into updateStream; 

from updateStream 
insert overwrite roomTable 
on roomTable.roomId == roomId; 
Verwandte Themen