- Sink parallelization
- Channel considerations
- Events TTL
- Grouping rules
- Writing logs
NOTE: The batching mechanism is currently only available for
Batching is the mechanism Cygnus implements for processing sets of events all together instead of one by one. These sets, or properly said batches, are built by
OrionSink, the base class all the sinks extend. Thus, having the batches already created in the inherited code the sinks only have to deal with the persistence of the data within them. Typically, the information within a whole batch is aggregated into a large data chunk that is stored at the same time by using a single write/insert/upsert operation. Why?
What is important regarding the batch mechanism is it largely increases the performance of the sink because the number of writes is dramatically reduced. Let's see an example. Let's assume 100 notifications, no batching mechanism at all and a HDFS storage. It seems obvious 100 writes are needed, one per notification. And writing to disk is largely slow. Now let's assume a batch of size 100. In the best case, all these notifications regard to the same entity, which means all the data within them will be persisted in the same HDFS file and therefore only one write is required.
Obviously, not all the events will always regard to the same unique entity, and many entities may be involved within a batch. But that's not a problem, since several sub-batches of events are created within a batch, one sub-batch per final destination HDFS file. In the worst case, the whole 100 entities will be about 100 different entities (100 different HDFS destinations), but that will not be the usual scenario. Thus, assuming a realistic number of 10-15 sub-batches per batch, we are replacing the 100 writes of the event by event approach with only 10-15 writes.
Nevertheless, a couple of risks arise when using batches:
- The first one is the last batch may never get built. I.e. in the above 100 size batch if only 99 events are notified and the 100th event never arrives, then the batch is never ready to be preocessed by the sink. Thats the reason the batch mechanism adds an accumulation timeout to prevent the sink stays in an eternal state of batch building when no new data arrives. If such a timeout is reached, then the batch is persisted as it is.
- The second one is the data within the batch may be lost if Cygnus crashes or it is stopped while accumulating it. Please observe until the batch size (or the timeout) is reached the data within the batch is not persisted and it exists nowhere in the data workflow (the NGSI source -typically Orion Context Broker- most probably will not have a copy of the data anymore once it has been notified). There is an under study issue regarding this.
By default, all the sinks have a configured batch size and batch accumulation timeout of 1 and 30 seconds, respectively. These are the parameters all the sinks have for these purpose:
<agent_name>.sinks.<sink_name>.batch_size = 1 <agent_name>.sinks.<sink_name>.batch_timeout = 30
Nevertheless, as explained above, it is highly recommended to increase at least the batch size for performance purposes. Which are the optimal values? The size of the batch it is closely related to the transaction size of the channel the events are got from (it has no sense the first one is greater then the second one), and it depends on the number of estimated sub-batches as well. The accumulation timeout will depend on how often you want to see new data in the final storage. On the contrary, very large batch sizes and timeouts may have impact on your data persistence if Cygnus crashes or it is stopped in the meantime.
Most of the processing effort done by Cygnus is located at the sinks, and these elements can be a bottleneck if not configured appropriately.
Basic Cygnus configuration is about a source writting Flume events into a single channel where a single sink consumes those events:
cygnusagent.sources = mysource cygnusagent.sinks = mysink cygnusagent.channels = mychannel cygnusagent.sources.mysource.type = ... cygnusagent.sources.mysource.channels = mychannel ... other source configurations... cygnusagent.channels.mychannel.type = ... ... other channel configurations... cygnusagent.sinks.mysink.type = ... cygnusagent.sinks.mysink.channel = mychannel ... other sink configurations...
This can be clearly moved to a multiple sink configuration running in parallel. But there is not a single configuration but many:
You can simply add more sinks consuming events from the same single channel. This configuration theoretically increases the processing capabilities in the sink side, but usually shows an important drawback, specially if the events are consumed by the sinks very fast: the sinks have to compete for the single channel. Thus, some times you can find that adding more sinks in this way simply turns the system slower than a single sink configuration. This configuration is only recommended when the sinks require a lot of time to process a single event, ensuring few collisions when accessing the channel.
cygnusagent.sources = mysource cygnusagent.sinks = mysink1 mysink2 mysink3 ... cygnusagent.channels = mychannel cygnusagent.sources.mysource.type = ... cygnusagent.sources.mysource.channels = mychannel ... other source configurations... cygnusagent.channels.mychannel.type = ... ... other channel configurations... cygnusagent.sinks.mysink1.type = ... cygnusagent.sinks.mysink1.channel = mychannel ... other sink configurations... cygnusagent.sinks.mysink2.type = ... cygnusagent.sinks.mysink2.channel = mychannel ... other sink configurations... cygnusagent.sinks.mysink3.type = ... cygnusagent.sinks.mysink3.channel = mychannel ... other sink configurations... ... other sinks configurations...
The above mentioned drawback can be solved by configuring a channel per each sink, avoiding the competition for the single channel.
However, when multiple channels are used for a same storage, then some kind of dispatcher deciding which channels will receive a copy of the events is required. This is the goal of the Flume Channel Selectors, a piece of software selecting the appropriate set of channels the Flume events will be put in. The default one is
Replicating Channel Selector, i.e. each time a Flume event is generated at the sources, it is replicated in all the channels connected to those sources. There is another selector, the
Multiplexing Channel Selector, which puts the events in a channel given certain matching-like criteria. Nevertheless:
- We want the Flume events to be replicated per each configured storage. E.g. we want the events are persisted both in a HDFS and CKAN storage.
- But within a storage, we want the Flume events to be put into a single channel, not replicated. E.g. among all the channels associated to a HDFS storage, we only want to put the event within a single one of them.
- And the dispatching criteria is not based on a matching rule but on a round robin-like behaviour. E.g. if we have 3 channels (
ch3) associated to a HDFS storage, then select first
ch3and then again
Due to the available Channel Selectors do not fit our needs, a custom selector has been developed:
RoundRobinChannelSelector. This selector extends
Replicating Channel Selector and
Multiplexing Channel Selector do.
cygnusagent.sources = mysource cygnusagent.sinks = mysink1 mysink2 mysink3 cygnusagent.channels = mychannel1 mychannel2 mychannel3 cygnusagent.sources.mysource.type = ... cygnusagent.sources.mysource.channels = mychannel1 mychannel2 mychannel3 ... cygnusagent.sources.mysource.selector.type = com.telefonica.iot.cygnus.channelselectors.RoundRobinChannelSelector cygnusagent.sources.mysource.selector.storages = N cygnusagent.sources.mysource.selector.storages.storage1 = <subset_of_cygnusagent.sources.mysource.channels> ... cygnusagent.sources.mysource.selector.storages.storageN = <subset_of_cygnusagent.sources.mysource.channels> ... other source configurations... cygnusagent.channels.mychannel1.type = ... ... other channel configurations... cygnusagent.channels.mychannel2.type = ... ... other channel configurations... cygnusagent.channels.mychannel3.type = ... ... other channel configurations... cygnusagent.sinks.mysink1.type = ... cygnusagent.sinks.mysink1.channel = mychannel1 ... other sink configurations... cygnusagent.sinks.mysink2.type = ... cygnusagent.sinks.mysink2.channel = mychannel2 ... other sink configurations... cygnusagent.sinks.mysink3.type = ... cygnusagent.sinks.mysink3.channel = mychannel3 ... other sink configurations... ... other sinks configurations...
Basically, the custom Channel Selector type must be configured, together with the mapping of channels per storage. This mapping is configured in the form of:
- Total number of different storages. E.g. if we have a MySQL storage, a CKAN storage and a HDFS storage then
cygnusagent.sources.mysource.selector.storages = 3. Please observe this apply to different storages of the same type, e.g. if we have a MySQL storage and two different HDFS storages (i.e. different HDFS endpoints), then
cygnusagent.sources.mysource.selector.storages = 3as well.
- Subset of channels associated to each storage. The union of all the subsets must be equal to all the channels configured for the source. E.g. if
cygnusagent.sources.mysource.channels = ch1 ch2 ch3 ch4 ch5 ch6and if
ch1is associated to a MySQL storage,
ch3are associated to a CKAN storage and
ch6are associated to a HDFS storage then
cygnusagent.sources.mysource.selector.storages.storage1 = ch1,
cygnusagent.sources.mysource.selector.storages.storage2 = ch2,ch3and
cygnusagent.sources.mysource.selector.storages.storage3 = ch4,ch5,ch6.
This Flume Sink Processor is not suitable for our parallelization purposes due to the load balancing is done in a sequential way. I.e. either in a round robin-like configuration of the load balancer either in a ramdom way, the sinks are used one by one and not at the same time.
The most important thing when designing a channel for Cygnus (in general, a Flume-based application) is the tradeoff between speed and reliability. This applies especialy to the channels.
On the one hand, the
MemoryChannel is a very fast channel since it is implemented directly in memory, but it is not reliable at all if, for instance, Cygnus crashes for any reason and it is recovered by a third party system (let's say Monit): in that case the Flume events put into the memory-based channel before the crash are lost. On the other hand, the
JDBCChannel are very reliable since there is a permanent support for the data in terms of OS files or RDBM tables, respectively. Nevertheless, they are slower than a
MemoryChannel sice the I/O is done against the HDD and not against the memory.
There are no empirical tests showing a decrease of the performance if the channel capacity is configured with a large number, let's say 1 million of Flume events. The
MemoryChannel is supposed to be designed as a chained FIFO queue, and the persistent channels only manage a list of pointers to the real data, which should not be hard to iterate.
Such large capacities are only required when the Flume sources are faster than the Flume sinks (and even in that case, sooner or later, the channels will get full) or a lot of processing retries are expected within the sinks (see next section).
In order to calculate the appropiate capacity, just have in consideration the following parameters:
- The amount of events to be put into the channel by the sources per unit time (let's say 1 minute).
- The amount of events to be gotten from the channel by the sinks per unit time.
- An estimation of the amount of events that could not be processed per unit time, and thus to be reinjected into the channel (see next section).
Every Flume event managed by Cygnus has associated a Time-To-Live (TTL), a number specifying how many times that event can be reinjected in the channel the sink got it from. Events are reinjected when a processing error occurs (for instance, the persistence system is not available, there has been a communication breakdown, etc.). This TTL has to be configured very carefully since large TTLs may lead to a quick channel capacity exhaustion, and once reached that capacity new events cannot be put into the channel. In addition, the more large is the TTL, the more will decrease the performance of the Cygnus instance since both new fresh events will have to coexist with old not processed events in the queue.
If you don't care about not processed events, you may configure a 0 TTL, obtaining the maximum performance regarding this aspect.
The grouping rules feature is a powerful tool for routing your data, i.e. deciding the right destination (HDFS file, MySQL table, CKAN resource) for your context data; on the contrary, the default destination is used, i.e. the concatenation of the entity identifier and the entity type.
As you may suppose, the usage of the grouping rules is slower than using the default. This is because the destination is decided after checking a list of rules in a sequential way, trying to find a regex match. Here, worth remembering that regex matching is slow, and that you may configure as many groupin rules as you want/need.
Nevertheless, you may write your grouping rules in a smart way:
- Place the most probably grouping rules first. Since the checking is sequential, the sooner the appropriate rule is found for a certain event the sooner another event may be checked. Thus, having those rules applying to the majority of the events in the first place of the list will increase the performance; then, put the rules applying to the second major set of evens, and so on.
- The simplest matching set of rules derive from the simplest way of naming the context entities, their types or the fiware-service they belog to. Try to use names that can be easily grouped, e.g. numeric rooms and character rooms can be easily modeled by using only 2 regular expressions such as
room\.(\D*), but more anarchical ways of naming them will lead for sure into much more different more complex rules.
Writing logs, as any I/O operation where disk writes are involved, is largely slow. Please avoid writing a huge number if logs unless necessary, i.e. because your are debuging Cygnus, and try running cygnus at least with
INFO level (despite a lot of logs are still written at that level). The best is running with
ERROR level. Logs are totaly disabled by using the
Logging level Cygnus run with is configured in
INFO is configured by dafault: