Ejemplo de uso del CEP

 

INTERFAZ DE COMUNICACIÓN MQTT

El µCEP utiliza el protocolo de mensajería MQTT para suscribirse a mensajes entrantes y publicar eventos salientes. Se puede utilizar cualquier broker MQTT deseado: por ejemplo, los servidores públicos de HiveMQ; o bien un servidor Mosquitto desplegado en una máquina propia; así mismo, se puede solicitar una cuenta en el broker MQTT privado de Cosmos rellenando el formulario de contacto. En cualquier caso, se debe recurrir al archivo confFile.ini para definir los topics de subscripción y publicación (mensajes entrantes y eventos salientes), así como los parámetros de conexión al broker MQTT:

[MQTT]

portMQTT=1883

hostMQTT=broker.hivemq.com

topicMQTTCollect=mobilityLabs/cosmos/input/messages

topicMQTTPublish=mobilityLabs/cosmos/output/events

#userMQTT=none

#passwordMQTT=none

 

Archivo de reglas DOLCE

Con respecto a las reglas (condiciones) a aplicar a un conjunto de mensajes entrantes al µCEP, se tiene que editar el fichero detect.dolce. En él se establece los tipos de datos que se van a procesar, las condiciones que deben cumplirse para generar un evento y el formato que tendrán estos eventos (eventos complejos) de salida.

Cabe destacar que en el lenguaje DOLCE se modela un MENSAJE ENTRANTE con la cláusula EVENT, mientras que se modela un EVENTO SALIENTE con la cláusula COMPLEX EVENT.

/*

Definition of temperature event.

It has the following attributes:

– a SensorId: To identify the sensor

– a Description: To add some information about the sensor

– a Position: Where the sensor is placed

– a ReadingTime: When the sensor has performed this reading

– a Value: The temperature value

The accept statement allows filtering. In this case, we are going to consider only some sensor Ids

*/

event TemperatureReading

{

use

{

int SensorId,

string Description,

pos Position,

time ReadingTime,

float Value

};

accept { SensorId > 10 };

}

/*

This complex definition allows calculating the average temperature.

– The detect statement defines that this Complex will consider only as input the TemperatureReading signal

– The where statement allows also to filter some values. In this case, we only want to consider value greater than -10

– The payload defines the attributes of the complex event. In this examples, we only want to calculate the temperature average.

*/
complex AverageTemperature
{

payload {

float average = avg(Value)

};

detect TemperatureReading where (Value > -10) in [10];
}

 

Finalmente, el mensaje de entrada de prueba que se puede publicar podría ser tal que así:

1 TemperatureReading int SensorId 10 string Description “Temperature sensor placed in Madrid, central train station” pos Position 7.675946\45.069258 time ReadingTime 2016-05-13T11:47:37Z float Value 25.7 

1 TemperatureReading int SensorId 11 string Description “Temperature sensor placed in Seville, central train station” pos Position 45.675946\100.069258 time ReadingTime 2016-05-13T11:47:37Z float Value 35.7 

1 TemperatureReading int SensorId 14 string Description “Temperature sensor placed in Madrid, airport” pos Position 8.675946\47.069258 time ReadingTime 2016-05-13T11:47:37Z float Value 28.7

 

Como se puede comprobar, un mensaje entrante comienza siempre con el código 1, seguido del nombre usado para modelar el mensaje entrante (evento entrante), en este caso TemperatureReading. Tras esto, se envía cada tuple TPV (Type, Parameter, Value), es decir, int SensorId10, float Value 25, etc., en el orden que se quiera, ya que no necesariamente tiene que coincidir con el modelo definido. Con respecto a los eventos salientes (complex events), siguen el mismo formato y su contenido es el establecido en la cláusula payload, si bien empezarán con el código 2.