NGSIGroupingInterceptor
IMPORTANT NOTE: from release 1.6.0, this feature is deprecated in favour of Name Mappings. More details can be found here.
Content:
Functionality
This is a custom Interceptor specifically designed for Cygnus. Its purpose is to alter an original NGSIEvent
object (which comes from a NGSI notification handled by NGSIRestHandler
) by inferring the destination entity where the data regarding a notified entity is going to be persisted. This destination entity, depending on the used sinks, may be a HDFS file name, a MySQL table name or a CKAN resource name. In addition, a new fiware-servicePath
containing the destination entity may be configured; for instance, in case of HDFS, this is a folder; in case of CKAN this is a package; in case of MySQL this is simply a prefix for the table name.
Such an inference is made by inspecting (but not modifying) certain configured fields within the ContextElement
object of the NGSIEvent
; if the concatenation of such fields matches a configured regular expression, then:
- The configured destination entity is added as the value of a
grouped-entity
header. - The configured destination FIWARE service path is added as the value of a
grouped-servicepath
header.
Additionally, a notified-entity
header is added containing the concatenation of the original entity ID and type.
This way, those sinks having enabled the grouping rules will use both the grouped-entity
and the grouped-servicepath
headers. If not, i.e. the original notification is aimed to be used, then the notified-entity
and the fiware-servicepath
headers will be used.
Grouping rules syntax
There exists a file containing Json-like rules definition, following this format:
{
"grouping_rules": [
{
"id": 1,
"fields": [
...
],
"regex": "...",
"destination": "...",
"fiware_service_path": "..."
},
...
]
}
Being:
- id: A unique unsigned integer-based identifier.
- fields: These are the fields that will be concatenated for regular expression matching. The available dictionary of fields for concatenation is "entityId", "entityType" and "servicePath". The order of these fields is important since the concatenation is made from left to right.
- regex: Java-like regular expression to be applied on the concatenated fields. Special characters like '\' must be escaped ('\' is escaped as "\\").
- destination: Name of the HDFS file or CKAN resource where the data will be effectively persisted. In the case of MySQL, Mongo and STH Comet this sufixes the table/collection name.
- fiware_service_path: New
fiware-servicePath
replacing the notified one. The sinks will translate this into the name of the HDFS folder or CKAN package where the above destination entity will be placed. In the case of MySQL, Mongo and STH Comet this prefixes the table/collection name.
For each notification, rules are tried sequentially until one of them matches; at that moment the rules are not checked anymore for that notification.
Regarding the syntax of the rules, all the fields are mandatory and must have a valid value.
Headers before and after intercepting
Before interception, these are the headers added by the NGSIRestHandler to all the internal Flume events:
fiware-service
. FIWARE service which the entity related to the notified data belongs to.fiware-servicepath
. FIWARE service path which the entity related to notified data belongs to. If the notification relates to several entities in different service paths, these are included within this header separated by comma.fiware-correlator
. UUID reserved for identifying e2e flows all along an integration which Cygnus belongs to.transaction-id
. UUID for identifying internal flows, from a Flume source to a Flume sink.
After interception, these are the headers added by NGSIGroupingInterceptor:
notified-entities
. Comma-separated list of full entity IDs/destinations, being this the concatenation of a notified entity ID and its type. Since this is an internal header, the concatenation character is the equals,=
; when doing public, this equals is translater into_
(old encoding) orxffff
(new encoding).grouped-entities
. Comma-separated list of grouped/modified entities/destinations.grouped-servicepaths
. Comma-separated list of grouped/modified FIWARE service paths.
Other interceptors may add further headers, such as the timestamp
header added by native Timestamp interceptor.
Example
Let's assume these rules:
{
"grouping_rules": [
{
"id": 1,
"fields": [
"entityId",
"entityType"
],
"regex": "other\\.(\\d*)room",
"destination": "all_other",
"fiware_service_path": "/other"
},
{
"id": 2,
"fields": [
"entityId",
"entityType"
],
"regex": "suite\\.(\\D*)room",
"destination": "all_suites",
"fiware_service_path": "/suites"
}
]
}
Now, let's assume the following not-intercepted event regarding a received notification (the code below is an object representation, not any real data format):
notification={
headers={
fiware-service=hotel1,
fiware-servicepath=/other,/suites,
correlation-id=1234567890-0000-1234567890
},
body={
{
entityId=suite.12,
entityType=room,
attributes=[
...
]
},
{
entityId=other.9,
entityType=room,
attributes=[
...
]
}
}
}
As can be seen, two entities (suite.12
and other.9
) of the same type (room
) within the same FIWARE service (hotel
) but different service paths (/suites
and /other
) are notified. NGSIRestHandler
will create two NGSIEvent
's:
ngsi-event-1={
headers={
fiware-service=hotel,
fiware-servicepath=/suites,
transaction-id=1234567890-0000-1234567890,
correlation-id=1234567890-0000-1234567890,
timestamp=1234567890,
mapped-fiware-service=hotel,
mapped-fiware-service-path=/suites
},
original-context-element={
entityId=suite.12,
entityType=room,
attributes=[
...
]
},
mapped-context-element=null
}
ngsi-event-2={
headers={
fiware-service=hotel,
fiware-servicepath=/other,
transaction-id=1234567890-0000-1234567890,
correlation-id=1234567890-0000-1234567890,
timestamp=1234567890,
mapped-fiware-service=hotel,
mapped-fiware-service-path=/other
},
original-context-element={
entityId=other.9,
entityType=room,
attributes=[
...
]
},
mapped-context-element=null
}
Once intercepted, the above events will look like this (the code below is an object representation, not any real data format):
intercepted-ngsi-event-1={
headers={
fiware-service=hotel,
fiware-servicepath=/suites,
transaction-id=1234567890-0000-1234567890,
correlation-id=1234567890-0000-1234567890,
timestamp=1234567890,
grouped-entity=all_suites,
grouped-servicepath=/suites,
notified-entity=suite.12_room
},
original-context-element={
entityId=suite.12,
entityType=room,
attributes=[
...
]
},
mapped-context-element=null
}
intercepted-ngsi-event-2={
headers={
fiware-service=hotel,
fiware-servicepath=/other,
transaction-id=1234567890-0000-1234567890,
correlation-id=1234567890-0000-1234567890,
timestamp=1234567890,
grouped-entity=all_other,
grouped-servicepath=/other,
notified-entity=other.9_room
},
original-context-element={
entityId=other.9,
entityType=room,
attributes=[
...
]
},
mapped-context-element=null
}
Administration guide
Configuration
NGSIGroupingInterceptor
is configured through the following parameters:
Parameter | Mandatory | Default value | Comments |
---|---|---|---|
grouping_rules_conf_file | yes | N/A | It is very important to configure the absolute path to the grouping rules file. The grouping rules file is usually placed at [FLUME_HOME_DIR]/conf/ , and there exists a template within Cygnus distribution. |
enable_encoding | no | false | true or false, true applies the new encoding, false applies the old encoding. |
A configuration example could be:
cygnus-ngsi.sources.http-source.interceptors = gi <other-interceptors>
cygnus-ngsi.sources.http-source.interceptors.gi.type = com.telefonica.iot.cygnus.interceptors.NGSIGroupingInterceptor$Builder
cygnus-ngsi.sources.http-source.interceptors.gi.grouping_rules_conf_file = [FLUME_HOME_DIR]/conf/grouping_rules.conf
cygnus-ngsi.sources.http-source.interceptors.gi.enable_encoding = false
Management Interface related operations
The Management Interface of Cygnus exposes a set of operations under the /v1/groupingrules
path related to the grouping rules feature, allowing listing/updating/removing the rules. For instance:
GET http://<cygnus_host>:<management_port>/v1/groupingrules
{
"grouping_rules": [
{
"destination": "allcars",
"fields": [
"entityType"
],
"fiware_service_path": "cars",
"id": 1,
"regex": "Car"
},
{
"destination": "allrooms",
"fields": [
"entityType"
],
"fiware_service_path": "rooms",
"id": 2,
"regex": "Room"
}
]
}
Please, check this link in order to know further details.