NGSITestSink
Content:
Functionality
com.iot.telefonica.cygnus.sinks.NGSITestSink
, or simply NGSITestSink
is a sink designed to test Cygnus when receiving NGSI-like context data events. Usually, such a context data is notified by a Orion Context Broker instance, but could be any other system speaking the NGSI language.
Independently of the data generator, NGSI context data is always transformed into internal Flume events at Cygnus sources. In the end, the information within these Flume events is not meant to be persisted at any real storage, but simply logged (depending on your log4j
configuration, the logs will be printed in console, a file...).
Next sections will explain this in detail.
Mapping NGSI events to flume events
Notified NGSI events (containing context data) are transformed into Flume events (such an event is a mix of certain headers and a byte-based body), independently of the NGSI data generator or the final backend where it is persisted.
This is done at the Cygnus Http listeners (in Flume jergon, sources) thanks to NGSIRestHandler
. Once translated, the data (now, as a Flume event) is put into the internal channels for future consumption (see next section).
Mapping Flume events lo logs
The mapping is direct, converting the context data into strings to be written in console, or file...
Example
Assuming the following Flume event is created from a notified NGSI context data (the code below is an object representation, not any real data format):
flume-event={
headers={
content-type=application/json,
timestamp=1429535775,
transactionId=1429535775-308-0000000000,
ttl=10,
fiware-service=vehicles,
fiware-servicepath=4wheels,
notified-entities=car1_car
notified-servicepaths=4wheels
grouped-entities=car1_car
grouped-servicepath=4wheels
},
body={
entityId=car1,
entityType=car,
attributes=[
{
attrName=speed,
attrType=float,
attrValue=112.9
},
{
attrName=oil_level,
attrType=float,
attrValue=74.6
}
]
}
}
Assuming the log appender is the console, then NGSITestSink
will log the data within the body as:
time=2015-12-10T14:31:49.389CET | lvl=INFO | trans=1429535775-308-0000000000 | srv=vehicles | subsrv=4wheels | function=getEvents | comp=Cygnus | msg=com.telefonica.iot.cygnus.handlers.NGSIRestHandler[150] : Starting transaction (1429535775-308-0000000000)
time=2015-12-10T14:31:49.392CET | lvl=INFO | trans=1429535775-308-0000000000 | srv=vehicles | subsrv=4wheels | function=getEvents | comp=Cygnus | msg=com.telefonica.iot.cygnus.handlers.NGSIRestHandler[232] : Received data ({ "subscriptionId" : "51c0ac9ed714fb3b37d7d5a8", "originator" : "localhost", "contextResponses" : [ { "contextElement" : { "attributes" : [ { "name" : "speed", "type" : "float", "value" : "112.9" }, { "name" : "oil_level", "type" : "float", "value" : "74.6" } ], "type" : "car", "isPattern" : "false", "id" : "car1" }, "statusCode" : { "code" : "200", "reasonPhrase" : "OK" } } ]})
time=2015-12-10T14:31:49.394CET | lvl=INFO | trans=1429535775-308-0000000000 | srv=vehicles | subsrv=4wheels | function=getEvents | comp=Cygnus | msg=com.telefonica.iot.cygnus.handlers.NGSIRestHandler[255] : Event put in the channel (id=1491400742, ttl=10)
time=2015-12-10T14:31:49.485CET | lvl=INFO | trans=1429535775-308-0000000000 | srv=vehicles | subsrv=4wheels | function=persistAggregation | comp=Cygnus | msg=com.telefonica.iot.cygnus.sinks.NGSITestSink[176] : [test-sink] Persisting data at NGSITestSink. Data (Processing event={Processing headers={recvTimeTs= 1429535775, fiwareService=vehicles, fiwareServicePath=4wheels, destinations=car1_car}, Processing context element={id=car1, type=car}, Processing attribute={name=speed, type=float, value="112.9", metadata=[]}, Processing attribute={name=oil_level, type=float, value="74.6", metadata=[]}})
time=2015-12-10T14:31:49.486CET | lvl=INFO | trans=1429535775-308-0000000000 | srv=vehicles | subsrv=4wheels | function=process | comp=Cygnus | msg=com.telefonica.iot.cygnus.sinks.NGSISink[178] : Finishing transaction (1429535775-308-0000000000)
Adinistration guide
Configuration
NGSITestSink
is configured through the following parameters:
Parameter | Mandatory | Default value | Comments |
---|---|---|---|
type | yes | N/A | Must be com.telefonica.iot.cygnus.sinks.NGSITestSink |
channel | yes | N/A | |
enable_grouping | no | false | true or false. |
enable_lowercase | no | false | true or false. |
data_model | no | dm-by-entity | Always dm-by-entity, even if not configured. |
batch_size | no | 1 | Number of events accumulated before persistence. |
batch_timeout | no | 30 | Number of seconds the batch will be building before it is persisted as it is. |
batch_ttl | no | 10 | Number of retries when a batch cannot be persisted. Use 0 for no retries, -1 for infinite retries. Please, consider an infinite TTL (even a very large one) may consume all the sink's channel capacity very quickly. |
A configuration example could be:
cygnusagent.sinks = test-sink
cygnusagent.channels = test-channel
...
cygnusagent.sinks.test-sink.type = com.telefonica.iot.cygnus.sinks.NGSITestSink
cygnusagent.sinks.test-sink.channel = ckan-channel
cygnusagent.sinks.test-sink.enable_grouping = false
cygnusagent.sinks.test-sink.enable_lowercase = false
cygnusagent.sinks.test-sink.data_model = dm-by-entity
cygnusagent.sinks.test-sink.batch_size = 100
cygnusagent.sinks.test-sink.batch_timeout = 30
cygnusagent.sinks.test-sink.batch_ttl = 10
Use cases
Use this sink in order to test if a Cygnus deployment is properly receiving notifications from an Orion Context Broker premise.
Important notes
About batching
NGSITestSink
extends NGSISink
, which provides a batch-based built-in mechanism for collecting events from the internal Flume channel. This mechanism allows extending classes have only to deal with the persistence details of such a batch of events in the final backend.
What is important regarding the batch mechanism is it largely increases the performance of the sink, because the number of writes is dramatically reduced. Particularly, this is not important for this test sink, but the other sinks will largely benefit from this feature. Please, check the specific sink documentation for more details.