NGSIRestHandler
Content:
Functionality
Mapping NGSI events to flume events
This section explains how a notified NGSI event (an http message) containing context data is converted into a Flume event (an object in memory or a file), suitable for being consumed by any of the Cygnus sinks, thanks to NGSIRestHandler.
It is necessary to remark again this handler is designed for being used by HttpSource, the native component of Apache Flume. An http message containing a NGSI-like notification will be received by HttpSource and passed to NGSIRestHandler in order to create one, and only one, Flume event object to be put in a sink's channel (mainly, these channels are objects in memory, or files).
On the one hand, the http message containing the NGSI-like notification will be composed of a set of http headers, and a payload. On the other hand, a Flume event object is composed of a set of headers, and a body. As can be seen, there is a quasi-direct translation among http message and Flume event object:
| http message | Flume event object |
|---|---|
Content-Type header |
content-type header (discarded from 0.14.0) |
Fiware-Service header |
fiware-service header |
Fiware-ServicePath header |
fiware-servicepath header |
Fiware-Correlator header |
fiware-correlator header |
| any other header | discarded |
| payload | body |
All the FIWARE headers are added to the Flume event object if notified. If not, default values are used (it is the case of fiware-service and fiware-servicepath, which take the configured value of default_service and default_service_path respectively, see below the configuration section) or auto-generated (it is the case of fiware-correlator).
In addition to the fiware-correlator, a transaction-id is created for internally identify a complete Cygnus transaction, i.e. starting at the source when the context data is notified, and finishing in the sink, where such data is finally persisted. If Fiware-Correlator header is not notified, then fiware-correlator and transactionid get the same auto-generated value.
Additional headers added by Flume interceptors
Despite all the details about interceptors used in Cygnus are widely documented here, it is worth reminding that:
- An interceptor is a piece of code in charge of "intercepting" events before they are put in the sink's channel and modifying them by adding/removing/modifying a header.
- A
timestampheader is added by the nativeTimestampInterceptor. It is expressed as a Unix time. - A
notified-entitiesheader is added by the customGroupingInterceptor. This header contains one default destination per each notified context element. It is used by the sinks when the grouping rules are not enabled. - A
grouped-entitiesheader is added by the customGroupingInterceptor. This header contains one grouped destination per each notified context element. It is used by the sinks when the grouping rules are enabled. - A
grouped-servicepathheader is added by the customGroupingInterceptor. This header contains one grouped service path per each notified context element. It is used by the sinks when the grouping rules are enabled.
Example
A NGSI-like event example could be (the code below is an object representation, not any real data format; look for it at Orion documentation):
ngsi-event={
http-headers={
Content-Length: 492
Host: localhost:1028
Accept: application/json
Content-Type: application/json
Fiware-Service: vehicles
Fiware-ServicePath: /4wheels
Fiware-Correlator: ABCDEF1234567890
},
payload={
{
"subscriptionId" : "51c0ac9ed714fb3b37d7d5a8",
"originator" : "localhost",
"contextResponses" : [
{
"contextElement" : {
"attributes" : [
{
"name" : "speed",
"type" : "float",
"value" : "112.9",
"metadatas": []
},
{
"name" : "oil_level",
"type" : "float",
"value" : "74.6",
"metadatas": []
}
],
"type" : "car",
"isPattern" : "false",
"id" : "car1"
},
"statusCode" : {
"code" : "200",
"reasonPhrase" : "OK"
}
]
}
}
}
As said, Flume events are not much more different than the above representation: there is a set of headers and a body. This is an advantage, since allows for a quick translation between formats. The equivalent object representation (not any real data format) for such a notified NGSI event could be the following Flume event:
flume-event={
headers={
timestamp=1429535775,
transaction-id=0123456789ABCDEF,
fiware-correlator=ABCDEF1234567890,
fiware-service=vehicles,
fiware-servicepath=/4wheels,
notified-entities=car1_car,
grouped-entities=cars,
grouped-servicepath=/mycars
},
body={
entityId=car1,
entityType=car,
attributes=[
{
attrName=speed,
attrType=float,
attrValue=112.9
},
{
attrName=oil_level,
attrType=float,
attrValue=74.6
}
]
}
}
Administration guide
Configuration
NGSIRestHandler is configured through the following parameters:
| Parameter | Mandatory | Default value | Comments |
|---|---|---|---|
| notification_target | no | notify/ |
Any other configured value must start with /. |
| default_service | no | default |
Alphanumerics and underscores are only accepted. |
| default_service_path | no | / |
/ is the root service path (also know as root subservice). Any other configured value must start with /. Apart from the initial slash, alphanumerics and underscores are only accepted. |
A configuration example could be:
cygnusagent.sources = http-source
...
cygnusagent.sources.http-source.handler = com.telefonica.iot.cygnus.handlers.NGSIRestHandler
cygnusagent.sources.http-source.notification_target = /notify
cygnusagent.sources.http-source.default_service = default
cygnusagent.sources.http-source.default_service_path = /
Accepted character set
This handler for NGSI only works with UTF-8 encoding. Thus, notifications must send a Content-Type header with application/json; charset=utf-8 as value. Any other content type wont be considered and the notification will be discarded.
It is expected UTF-8 character set is maintained by all the Flume elements in the configuration, in order the final sinks (or their backend abstractions, if they exist) compose their writes/inserts/upserts by properly specifying this kind of encoding.
Programmers guide
NGSIRestHandler class
TBD