NGSIRestHandler

Content:

Functionality

Mapping NGSI events to flume events

This section explains how a notified NGSI event (an http message) containing context data is converted into a Flume event (an object in memory or a file), suitable for being consumed by any of the Cygnus sinks, thanks to NGSIRestHandler.

It is necessary to remark again this handler is designed for being used by HttpSource, the native component of Apache Flume. An http message containing a NGSI-like notification will be received by HttpSource and passed to NGSIRestHandler in order to create one, and only one, Flume event object to be put in a sink's channel (mainly, these channels are objects in memory, or files).

On the one hand, the http message containing the NGSI-like notification will be composed of a set of http headers, and a payload. On the other hand, a Flume event object is composed of a set of headers, and a body. As can be seen, there is a quasi-direct translation among http message and Flume event object:

http message Flume event object
Content-Type header content-type header (discarded from 0.14.0)
Fiware-Service header fiware-service header
Fiware-ServicePath header fiware-servicepath header
Fiware-Correlator header fiware-correlator header
any other header discarded
payload body

All the FIWARE headers are added to the Flume event object if notified. If not, default values are used (it is the case of fiware-service and fiware-servicepath, which take the configured value of default_service and default_service_path respectively, see below the configuration section) or auto-generated (it is the case of fiware-correlator).

In addition to the fiware-correlator, a transaction-id is created for internally identify a complete Cygnus transaction, i.e. starting at the source when the context data is notified, and finishing in the sink, where such data is finally persisted. If Fiware-Correlator header is not notified, then fiware-correlator and transactionid get the same auto-generated value.

Top

Additional headers added by Flume interceptors

Despite all the details about interceptors used in Cygnus are widely documented here, it is worth reminding that:

  • An interceptor is a piece of code in charge of "intercepting" events before they are put in the sink's channel and modifying them by adding/removing/modifying a header.
  • A timestamp header is added by the native TimestampInterceptor. It is expressed as a Unix time.
  • A notified-entities header is added by the custom GroupingInterceptor. This header contains one default destination per each notified context element. It is used by the sinks when the grouping rules are not enabled.
  • A grouped-entities header is added by the custom GroupingInterceptor. This header contains one grouped destination per each notified context element. It is used by the sinks when the grouping rules are enabled.
  • A grouped-servicepath header is added by the custom GroupingInterceptor. This header contains one grouped service path per each notified context element. It is used by the sinks when the grouping rules are enabled.

Top

Example

A NGSI-like event example could be (the code below is an object representation, not any real data format; look for it at Orion documentation):

ngsi-event={
    http-headers={
        Content-Length: 492
        Host: localhost:1028
        Accept: application/json
        Content-Type: application/json
        Fiware-Service: vehicles
        Fiware-ServicePath: /4wheels
        Fiware-Correlator: ABCDEF1234567890
    },
    payload={
        {
            "subscriptionId" : "51c0ac9ed714fb3b37d7d5a8",
            "originator" : "localhost",
            "contextResponses" : [
                {
                    "contextElement" : {
                    "attributes" : [
                        {
                            "name" : "speed",
                            "type" : "float",
                            "value" : "112.9",
                            "metadatas": []
                        },
                        {
                            "name" : "oil_level",
                            "type" : "float",
                            "value" : "74.6",
                            "metadatas": []
                        }
                    ],
                    "type" : "car",
                    "isPattern" : "false",
                    "id" : "car1"
                },
                "statusCode" : {
                    "code" : "200",
                    "reasonPhrase" : "OK"
                }
            ]
        }
    }
}

As said, Flume events are not much more different than the above representation: there is a set of headers and a body. This is an advantage, since allows for a quick translation between formats. The equivalent object representation (not any real data format) for such a notified NGSI event could be the following Flume event:

flume-event={
    headers={
         timestamp=1429535775,
         transaction-id=0123456789ABCDEF,
         fiware-correlator=ABCDEF1234567890,
         fiware-service=vehicles,
         fiware-servicepath=/4wheels,
         notified-entities=car1_car,
         grouped-entities=cars,
         grouped-servicepath=/mycars
    },
    body={
         entityId=car1,
         entityType=car,
         attributes=[
             {
                  attrName=speed,
                  attrType=float,
                  attrValue=112.9
             },
             {
                  attrName=oil_level,
                  attrType=float,
                  attrValue=74.6
             }
         ]
     }
}

Top

Administration guide

Configuration

NGSIRestHandler is configured through the following parameters:

Parameter Mandatory Default value Comments
notification_target no notify/ Any other configured value must start with /.
default_service no default Alphanumerics and underscores are only accepted.
default_service_path no / / is the root service path (also know as root subservice). Any other configured value must start with /. Apart from the initial slash, alphanumerics and underscores are only accepted.

A configuration example could be:

cygnusagent.sources = http-source
...
cygnusagent.sources.http-source.handler = com.telefonica.iot.cygnus.handlers.NGSIRestHandler
cygnusagent.sources.http-source.notification_target = /notify
cygnusagent.sources.http-source.default_service = default
cygnusagent.sources.http-source.default_service_path = /

Top

Accepted character set

This handler for NGSI only works with UTF-8 encoding. Thus, notifications must send a Content-Type header with application/json; charset=utf-8 as value. Any other content type wont be considered and the notification will be discarded.

It is expected UTF-8 character set is maintained by all the Flume elements in the configuration, in order the final sinks (or their backend abstractions, if they exist) compose their writes/inserts/upserts by properly specifying this kind of encoding.

Top

Programmers guide

NGSIRestHandler class

TBD

Top