8. Data Processing Rules

8.1. Summary

NetSpyGlass comes with embedded Python interpreter that is used to process collected monitoring data. At the end of each polling cycle, the program loads and runs Python class defined by the configuration parameter “monitor.rules”. It operates on lists of monitoring variables and can modify them, create new variables, create log records, add or modify tags on monitoring variables and trigger alerts.

8.2. Configuration

Configuration parameter monitor.rules is a dot-separated path of Python class used to process monitoring data. The path should include directories and module name. The system assumes path starts in the same directory where configuration file is located. For example:

monitor.rules = "rules.Rules"

In this case interpreter loads class Rules from file rules.py that is expected to be located in the same directory where configuration file nw2.conf is.

Default configuration loads class Nw2Rules from module nw2rules.py that ships as part of the internal Java resources. You can find a copy of this module in the directory doc in the distribution tar archive. This copy is not actually used by the program, it is provided so you can inspect it and consult its source code when you extend and modify data processing rules.

Note

The script should be written assuming particular monitoring variables may not exist when it runs. Presence of certain monitoring variable or certain instance depends on the state of the network, because variables are created using information we collect during discovery. For example, parts of the script may try to process variable tempSensor, however if the script runs on the network built with devices that have no sensors, corresponding monitoring variable will be empty (that is, has no instances).

class Nw2Rules(log)

Python class Nw2Rules is used to process data NetSpyGlass collects when it polls devices. NetSpyGlass calls function execute() of an instance of this class at the completion of each monitoring cycle. Note that new instance of this class is created every time it is used to process collected data.

Parameters:log – Java logger. To add log record, call it like this: self.log.info(“Log record”)
execute()

Java environment calls this function to execute data processing rules.

To modify default data processing rules, you need to create your own Python class that derives from nw2rules.Nw2Rules. Here is an example:

1
2
3
4
5
6
7
8
9
import nw2rules

class UserRules(nw2rules.Nw2Rules):
    def __init__(self, log):
        super(UserRules, self).__init__(log)

    def execute(self):
        self.log.info("#### UserRules.execute() starts")
        super(UserRules, self).execute()

Assuming this class is saved in file rules.py, configuration parameter monitor.rules should look like this:

monitor.rules = "rules.UserRules"

Function execute() in this class should call the same function in the base class and then can perform its own calculation. See below for more information on what it can do.

Note

You can turn rule processing off completely by using string none as a value of the parameter network.monitor.rules. This can be useful when NetSpyGlass runs in a cluster configuration where all computations are performed by a dedicated “compute” secondary server. Example:

monitor.rules = "none"

8.3. How to access monitoring variables

At the end of each polling cycle NetSpyGlass loads the module and creates an instance of the class defined by the module.rules parameter and calls its function execute() to process collected data.

To access monitoring variable, this function should call nw2functions.import_var() defined in the module nw2functions:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
import nw2rules
from nw2functions import *

class UserRules(nw2rules.Nw2Rules):
    def __init__(self, log):
        super(UserRules, self).__init__(log)

    def execute(self):
        super(UserRules, self).execute()

        in_octets = import_var('ifHCInOctets')

Call to import_var() returns a generator of MonitoringVariable objects. Each object collects monitoring data for particular variable of one network interface or hardware component of one device. This can be inbound interface utilization, outbound interface errors, CPU utilization, reading of particular temperature sensor etc. Information collected by one instance of monitoring variable is obtained by making SNMP request with one OID and the name of the monitoring variable usually matches the name of the OID. This is true for the variables that collect “raw” or “original” data as it comes from the device. Very often this data must be processed to make it usable, for example, interface utilization comes in the form of a byte counter, which needs to be converted to the rate in bits/sec for representation in the system. This is done in the Python script by calling built-in functions to compute the rate and multiply it by 8. Result of this calculation can be stored in a new variable named ifInRate , this can be done using the following lines of Python code:

1
2
3
in_octets = import_var('ifHCInOctets')
in_rate = mul(rate(in_octets), 8)
export_var('ifInRate', in_rate)

In this example, in_octets is a generator that yields objects of class MonitoringVariable that hold values of the corresponding counters collected from all interfaces of all devices by snmp collector. This generator can produce thousands of items, where each item corresponds to one interface of one device.

Here we create new Python variable in_rate which is also a generator of MonitoringVariable objects. To make them available to NetSpyGlass so they can appear in the UI and stored in the database, you need to “export” them by calling export_var().

8.4. How does this work

Function nw2functions.import_var() returns generator that yields copies of net.happygears.nw2.py.MonitoringVariable objects stored in the internal data pool. Copies are returned in order to make it safer and easier to manipulate their time series. If the call to nw2functions.import_var() returned original objects stored in the internal data pool, all functions that operate on their time series would need to be careful to only modify the last observation but not touch all other because these observations are used for graphing, alerts and reports by the server and an error in the Python data processing rules script could easily cause irreversible data loss. Returning copies and them merging them back into the data pool when script calls nw2functions.export_var() makes this safer.

In order to reduce total number of large objects created and destroyed while Python data processing script runs, copies returned by nw2functions.import_var() are taken from a separate pool of net.happygears.nw2.py.MonitoringVariable objects so they can be reused. These objects are called anonymous monitoring variables, the size of this shared pool is monitored using variable numAnonVars in category Monitor.

Using pooled anonymous variables greatly reduces number of objects that need to be created and then destroyed on every run of the data processing script. Consider NetSpyGlass server that monitors 1000 devices and total 100,000 interfaces. In order to calculate inbound interface utilization for each interface, it executes the following code:

1
2
if_hc_in_octets = import_var('ifHCInOctets')
export_var('ifInRate', mul(rate(if_hc_in_octets), 8))

Call to import_var() returns 100,000 objects (via generator). These objects are manipulated by rate() and mul() and finally returned back to the system by export_var(). Functions rate() and mul() do not create new objects, they modify those passed to them and return the same ones. Function export_var() takes last observation from the time series buffer of the input variables and merges it into time series buffer of the “actual” monitoring variable objects in the main data pool in the server. This makes calculated value available for graphing, reports etc. The same process has to be repeated for all other monitoring variables that describe some metrics related to interfaces. If we have 10 variables like that, every run of the script will use 1 million of nonymous variables. Pooling these objects allows us to avoid creating and destroying a million of objects on every script run. In reality the number is greater because total number of interface-related monitoring variables is greater and the same thing happens with variables that monitor parameters of hardware components, protocols etc.

To effectively reuse shared anonymous variables, they need to be returned to the pool as soon as the script is done using them. This is not an easy task though. The easiest approach is to collect all allocated anonymous variables at the end of the run of the data processing script. This is not very efficient though because it means each call to import_var() takes new set of anonymous variables from the pool, even if they are the same as those already taken before. Also, code sequences as shown above form “chains”: call to import_var() only returns generator, it does not really perform any calculations or data manilupations. It does not take anonymous variables from the pool either, yet. What happens is that function export_var() starts calling the chain of generators mul -> rate -> import_var which takes one anonymous object from the pool, fills it with data, calculates rate, then multiplies and finally merges result back into the main data pool. At this point export_var() releases anonymous variable and it is returned back into the pool of anonymous variables so that next iteration done by import_var() can take the same object from the pool. Now, instead of creating 100,000 anonymous variables we only need one. This dramatically reduces total number of objects that exist in the system at the same time, reducing total required memory footprint of the server.

However this creates certain caveats. For example, dong this would not work:

1
2
3
if_hc_in_octets = list(import_var('ifHCInOctets'))
export_var('ifInRate', mul(rate(if_hc_in_octets), 8))
# do some other calculations with if_hc_in_octets here

Normally, generator returned by the call to import_var() can only be used once. All objects that it yields are consumed by the call to rate() and after that, passing generator object to other functions would not work. It would seem wrapping it in list() is a solution - list comsumes all objects that the generator can produce and holds references to them in a Python list. However, since call to export_var() returns anonymous variables to the pool, the items in that list can not be used again because they have been recycled and are likely to hold data from some other “original” monitoring variables after the call to export_var().

Another problem with wrapping result of import_var() in a list is that it defeats the purpose of returning anonymous variables back to the pool as soon as possible. As was shown above, normally this code sequence needs only one anonymous variable object, however if we use the list, it is going to hold all 100,000 objects at some time and we need 100,000 anonymous variable objects. All these anonymous variables will be recycled at the end of the run of the data processing script, so it does not leak them, but the server is going to have to hold them all in memory for some time.

Important

Do not wrap result of the call to import_var() in a Python list to use the same anonymous variables more than once. Instances in this list become invalid and operations with them may yield unexpected and unpredictable results if they are used after the call to export_var() and some other functions from module nw2functions

Call to import_var() is cheap, it only creates iterator. Filling returned anonymous variables with data is also very efficient and does not take whole lot of time. Holding on to the prepared anonymous variables does not speed script execution and is unnecessary. Just call import_var() again if you need to perform different operations with the same monitoring variables.

Another interesting case with recycling of anonymous variables is related to filtering functions such as nw2functions.filter_by_tags() or nw2functions.skip_nans(). Consider the following code snippet (taken from one of the examples below in this section):

1
2
3
4
5
if_out_rate = filter_by_tags(import_var('ifOutRate'), ['ifBGP4Peer.AS174', '!VariableTags.Aggregate'])
aggr = new_var('Cogent', 'peering')
aggregate(aggr, skip_nans(if_out_rate))
aggr.addTag('VariableTags.Aggregate')
export_var('ifOutRate', aggr)

here we import variable ifOutRate but only want instances that have tag ‘ifBGP4Peer.AS174’ and not ‘VariableTags.Aggregate’. In other words, out of 100,000 instances of the variable ifOutRate we want to do something with only a few. Call to import_var() will still use 100,000 anonymous variables (sequentially, not at the same time) but call to filter_by_tags() will pass through only a few. What happens to those that it does not pass through? If nothing, then they would linger in the state “checked out from the pool but not returned” until the end of the run of data processing script and we are back to the situation when we need 100,000 anonymous variables. To avoid this problem, function nw2functions.filter_by_tags() actually recycles variables it does not pass through.

The same happens with function nw2functions.skip_nans() that skips monitoring variables where last value in the time series buffer is a NaN. This function also recycles variables it does not pass through.

8.5. Examples

8.5.1. Converting interface counters to interface utilization

IF-MIB OIDs ifHCInOctets and ifHCOutOctets return value of the internal 64-bit byte counters, but what we really need for the display of monitoring data is interface utilization in bits/sec. To convert, we need to calculate rate and multiply it by 8. We are going to smooth the data a little bit by computing sliding window median over 4 data points:

1
2
3
4
5
6
7
ifHCInOctets = import_var('ifHCInOctets')
ifInRate = mul(median(rate(ifHCInOctets, 4)), 8)
export_var('ifInRate', ifInRate)

ifHCOutOctets = import_var('ifHCOutOctets')
ifOutRate = mul(median(rate(ifHCOutOctets, 4)), 8)
export_var('ifOutRate', ifOutRate)

in this example rate() processes only 4 last observations in the time series passed to it as input. Function nw2functions.rate() is not going to copy full time series from the input if it only needs a few observations. Call to nw2functions.rate() returns new MonitoringVariable objects with time series that consist at most of 4 observations. These objects are then fed to the input of nw2functions.median() which computes median value of these observations and returns MonitoringVariable object with time series that has only one observation. This object is then exported back to the NetSpyGlass data processor which is going to merge this object with its own copy of the MonitoringVariable object for the same device and component.

8.5.2. Calculate interface utilization as percentage of speed

To do this, use variable ifSpeed` to get interface speed and call nw2functions.div() to divide and nw2functions.mul() to multiply by 100 to get percentages:

1
2
ifSpeed = import_var('ifSpeed')
ifInRatePercent  = mul(div(ifInRate, ifSpeed)  * 100.0)

8.5.3. max()

Here is how we can use Python built-in function max() to find hardware component with highest temperature across all components of all devices we monitor:

hottestComponent = max(import_var('tempSensor'))

tempSensor is a list of monitoring variables that poll temperature sensors discovered on the devices the system monitors. Each item in this list corresponds to one sensor on one device. Since MonitoringVariable provides function __cmp__(), instances of this class can be compared and a call to max() returns instance with highest temperature. Attributes device and ds can be used to retrieve information about the device and hardware component:

1
2
3
log.info("Hottest component: " + hottestComponent.device + " " +
    hottestComponent.ds.descr + ": " +
    str(hottestComponent.timeseries.last().value) + " C")

8.5.4. group_by_device()

To find hardware component with highest temperature for each device defined in the system, we could use the following Python code:

1
2
3
4
5
6
7
8
hottest_per_device = (max(x) for x in group_by_device(import_var('tempSensor')) if x is not None)

def printComponentTemp(var):
    if var.timeseries:
        log.info("  {0:25} | {1:40} | {2:4} {3}".format(
            var.device, var.ds.descr, var.timeseries[-1].value, var.ds.unit))

map(printComponentTemp, hottest_per_device)

Call to nw2functions.group_by_device() groups by device and returns a list of lists of MonitoringVariable objects; calling max() on each list created by nw2functions.group_by_device() quickly finds maximum value for each device. After this, list hottest_per_device has monitoring variable instance for each device for which we monitor at least one temperature sensor via variable tempSensor. Since input to the call to max() is a list of MonitoringVariable objects, the result is an instance of the same class, which means it retains reference to the DataSource object which tells us which device and component it describes. hottest_per_device is a list of variables like that. Built-in Python function max() uses “magic” function MonitoringVariable.__cmp__() that compares one variable instance with another. This function compares the value of the last data point in the time series of each instance. Function max() returns variable instance which has greater last value. Since it returns reference to the unmodified original monitoring variable instance, this instance retains its full time series.

Finally, this example calls Python built-in map() to print each member of the list hottest_per_device in a neatly formatted table.

8.5.5. Filtering by tags

Suppose we want calculate total volume of traffic we are sending towards AS1000, but out network peers with AS1000 in multiple points, so we have to add up values of the traffic levels going across several interfaces of different devices. We can use tag matching to find interfaces: each peering interface connected to AS1000 is going to have tag “BGP4Peering.AS1000”. The following call to Python built-in function filter() picks up right instances of the variable ifOutRate

ifOutRateAS1000 = filter(lambda var: 'BGP4Peer.AS1000' in var.tags, ifOutRate)

List ifOutRateAS1000 created this way has only components that correspond to interfaces that peer with AS1000. Now, to calculate aggregate we can call sum():

aggregateRateToAS1000 = [sum(ifOutRateAS1000)]

Do not forget to wrap variable returned by sum() in list “[ ]” because NetSpyGlass expects all monitoring variables to be lists or generators but sum() returns a scalar.

Python function sum() relies on the “magic” function MonitoringVariable.__add__() that can add MonitoringVariable instance and a constant or two MonitoringVariable instances. Function __add__() returns new copy of MonitoringVariable object to follow standard Python semantics of __add__().

8.5.7. Tag manipulation

Sometimes it can be useful to add or remove monitoring variables tags. For example, one rule in the rule processing script may add a tag to variables depending on their values, while another rule later in the script may do something depending on this tag. Tags added this way are not stored in the database, but since the program re-evaluates rules every 30 sec, the rule that adds the tag will keep adding it, making it work as if it was persistent. Here is an example how this could be done:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
def func(var):
    ts = var.timeseries[-1].timestamp
    value = var.timeseries[-1].value
    if value > 2000:
        var.tags.add('ColorLevel.2')
        var.tags.add('Color.red')
    else:
        var.tags.add('ColorLevel.1')
        var.tags.add('Color.green')

map(func, mv1)

Standard data processing script also does this to convert operational status of interfaces (“up” or “down”) to tags in facet ifOperStatus. These tags make it possible to filter interfaces by operational status in the device details panel using tag filter. Here is how this is done:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
def setInterfaceOpStatusTag(self, var):
    """
    Set tag to reflect interface oper status. Interfaces with status
    "up" get tag "ifOperStatus.Up", those in status "Down" get
    "ifOperStatus.Down". If variable has no time series data,
    skip it and do nothing.

    :param var   MonitoringVariable instance
    """
    if len(var.timeseries) > 1:
        if self.isInterfaceUp(var):
            var.tags.add("ifOperStatus.Up")
        else:
            var.tags.add("ifOperStatus.Down")

def execute(self, vars):
    self.log.info("===  Set Tags")
    map(self.setInterfaceOpStatusTag, vars['ifOperStatus'])

8.5.8. Calculating aggregates

The following example computes sum of outbound traffic through interfaces of all devices that have tag ‘ifBGP4Peer.AS174’. This tag is added automatically to all interfaces that carry BGP peering sessions with AS174 (COGENT). Calculated result is stored in the new monitoring variable instance that is added back to the same variable ‘ifOutRate’. New instance has device name “Cogent” and component “peering”. The code looks like this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
from nw2functions import *

class UserRules(nw2rules.Nw2Rules):
    def __init__(self, log):
        super(UserRules, self).__init__(log)

    def calculate_aggregate(self):
        if_out_rate = filter_by_tags(import_var('ifOutRate'), ['ifBGP4Peer.AS174', '!VariableTags.Aggregate'])
        aggr = new_var('Cogent', 'peering')
        aggregate(aggr, if_out_rate)
        aggr.addTag('VariableTags.Aggregate')
        export_var('ifOutRate', aggr)

Actual work is done by the function nw2functions.aggregate() that adds values of the latest observations of variable instances passed as second argument and stores the result in the monitoring variable passed to it as first argument.

To pick up only interfaces that carry traffic to AS174, we use filtering by tags:

filter_by_tags(import_var('ifOutRate'), ['ifBGP4Peer.AS174', '!VariableTags.Aggregate'])

here we import variable “ifOutRate” and pass it to nw2functions.filter_by_tags() that matches instances by tags passed as second argument. If tag is preceded with a “!”, the function matches if this tag is not present in the variable. In this example we pick monitoring variable instances that have tag “ifBGP4Peer.AS174” and do not have tag “VariableTags.Aggregate”. Matching on absence of “VariableTags.Aggregate” helps avoid adding calculated aggregate value to itself. Function nw2functions.filter_by_tags() returns generator that yields matching monitoring variable instances.

Function aggregate() has a side effect in that it sets tags in the destination variable to the intersection of tag sets of all input monitoring variable instances.

In the end, we add calculated aggregated instance to the same monitoring variable ‘ifOutRate’ by calling nw2functions.export_var() with this name. We can distinguish this new instance by its device and component name (“Cogent” and “peering” respectively).

The following example is in fact a copy of the internal implementation of nw2functions.aggregate(), it illustrates how Python built-in function reduce() can be used to calculate aggregate:

1
2
3
4
def aggregate(aggr_var, mvlist):
    aggr_var.clearTags()   # let MonitoringVariable.__add__() assign common set of tags to this
    reduce(lambda x, y: x + y, mvlist, aggr_var)
    return aggr_var

We can use reduce() because Java class MonitoringVariable has “magic” Python function __add__() to add a constant or a value of another MonitoringVariable object. This function works with the latest observations in the time series of both objects and overwrites time series of the MonitoringVariable object it is called on, leaving it with time series of length 1. Function __add__() has a side-effect in that it sets tags of the object it is called on to the intersection of sets of tags of it and the second MonitoringVariable object it operates on.

Next example takes more direct approach and calculates aggregate value of the new variable without the aid of the function nw2functions.aggregate(). Here, we calculate sum of values of variables tracking firewall counters with the same name configured on different routers:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from nw2functions import *

COUNTER_NAME = 'discard-ge-0/0/0.0-i'
TAG = 'CounterName.' + COUNTER_NAME

class UserRules(nw2rules.Nw2Rules):
    def __init__(self, log):
        super(UserRules, self).__init__(log)

    def calculate_aggregate(self, aggr, var_name):
        ts = 0
        new_value = 0.0
        for fw_counter_var in filter_by_tags(import_var(var_name), [TAG, '!VariableTags.Aggregate']):
            copy_attrs(aggr, fw_counter_var)
            new_value += fw_counter_var.timeseries.getLastValue()
            ts = fw_counter_var.timeseries.getLastTimeStamp()
        aggr.timeseries.put(ts, new_value)

    def execute(self):
        super(UserRules, self).execute()

        aggr = new_var('Attackers', 'Discarded_Combined')
        aggr.addTag('VariableTags.Aggregate')
        self.calculate_aggregate(aggr, 'fwCntrPacketRate')
        export_var('fwCntrPacketRate', [aggr])

        aggr = new_var('Attackers', 'Discarded_Combined')
        self.calculate_aggregate(aggr, 'fwCntrByteRate')
        export_var('fwCntrByteRate', [aggr])

As before we create new variable by calling nw2functions.new_var() and give it made-up device and component names. Then we import variable ‘fwCntrPacketRate’ or ‘fwCntrByteRate’ and iterate over its instances, looking for the one with component name matching my counter name. Then we take the last time stamp and value from the input variable and calculate the aggregate. Once this code is executed by the NetSpyGlass server, a row with device name “Attackers” and component “Discarded_Combined” should appear in the Graphing workbench as a member of monitoring variables “fwCntrPacketRate” and “fwCntrByteRate”

8.5.8.1. Aggregates and NaN values

When function nw2functions.aggregate() adds up values of monitoring variable instances to calculate the aggregate value, the result changes if the latest observation in one or more input variables has value NaN. Since the result of addition of a number and NaN is a NaN, calculated aggregate becomes NaN even if one of the input variables has this value. This happens for example when device is taken offline so NetSpyGlass can not poll it. In this case monitoring variables created for it still linger for some time until they expire but their values are NaN. If these variables are used to calculate some kind of aggregate value, the aggregate value becomes NaN, too.

This can be counter-intuitive since we do not want the aggregate graph to break just because one device has been taken offline. To work around this issue, you can use function nw2functions.skip_nans() to filter out variables with latest value NaN:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
from nw2functions import *

class UserRules(nw2rules.Nw2Rules):
    def __init__(self, log):
        super(UserRules, self).__init__(log)

    def calculate_aggregate(self):
        if_out_rate = filter_by_tags(import_var('ifOutRate'), ['ifBGP4Peer.AS174', '!VariableTags.Aggregate'])
        aggr = new_var('Cogent', 'peering')
        aggregate(aggr, skip_nans(if_out_rate))   # Call to skip_nans() skips variable instances with value NaN
        aggr.addTag('VariableTags.Aggregate')
        export_var('ifOutRate', aggr)

8.5.9. Calculating average value using data that fits in the memory buffer

To calculate average value of the variable over the data kept in memory, we use access to the time series buffer of the variable like this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
for mvar in import_var('ifInRate'):
    avg = 0
    cntr = 0
    for observation in mvar.timeseries:
        if observation.isNaN():
            continue
        avg += observation.value
        cntr += 1
    if cntr == 0:
        avg = float('nan')
    else:
        avg /= cntr
    avg_mvar = copy(mvar)
    avg_mvar.timeseries.clear()
    avg_mvar.timeseries.put(current_timestamp(), avg)
    export_var('ifInRateAvg', avg_var)

Note how we skip NaNs in the time series while calculating the average.

8.5.10. Calculating 95 percentile

95 percentile value for interface utilization is used for the Internet billing and capacity planning, and is often requested metrics. NetSpyGlass Python library offers function nw2functions.percentile() that can be used to do this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
def calculate_percentile(self, input_var_name, p_var_name, filter_tags, threshold_percent):

    for mvar in filter_by_tags(import_var(input_var_name), filter_tags):
        ts = current_timestamp()
        p_value = percentile(mvar, threshold_percent)
        p_mvar = copy(mvar)
        p_mvar.timeseries.clear()
        p_mvar.timeseries.put(ts, p_value)
        export_var(p_var_name, [p_mvar])

def execute(self):
    self.calculate_percentile('ifInRate',  'ifInRateP95',    ['ifAdminStatus.Up'], 95)
    self.calculate_percentile('ifOutRate',  'ifOutRateP95',  ['ifAdminStatus.Up'], 95)

function nw2functions.percentile() takes the following arguments:

  • mvar : input monitoring variable
  • threshold_percent : threshold as percentage of total number of observations in mvar

Function nw2functions.percentile() takes observations from the time series of its argument mvar, sorts them and returns the value of the observation that is nearest to the threshold_percent of observations (it uses “The Nearest Rank” method).

Function calculate_percentile() in the example above takes the following arguments:

  • input_var_name : the name of the input variable, e.g. ifInRate
  • p_var_name : the name to use for the percentile variable, e.g. ifInRateP95
  • filter_tags : (list of strings) the tags to be used as a filter to pick specific instances of the input variable to use.
  • threshold_percent : threshold as a percentile of total number of observations (e.g. 95 for 95% calculation)

For each monitoring variable instance in input_var, function calculate_percentile() calls nw2functions.percentile() to calculate the N-th percentile value, then creates new monitoring variable with name p_var_name as a copy of the input variable and adds calculated value as a new observation to it. Finally, new variable is exported back to the data processor to be merged with its copy in the variable pool.

Call calculate_percentile() with different variable name and threshold_percent value to calculate 75% or 99% percentile.

8.5.11. Calculating weekly peak values

The following example creates new monitoring variable with values equal to the weekly peak value of the corresponding input variable. The peak value is reset every Monday at 00:00 UTC and calculation starts over from the beginning. Difficulty with this calculation is that NetSpyGlass server usually does not have data in memory that would span whole week. Typical size of the memory buffer is measured in hours and can be between 1 hour (the default) and 1 day. We can’t simply call max(mvar.timeseries)() to find observation with maximum value because this only works on the data in memory. Instead, we compare current value of the variable that holds the peak value with current value of the input variable and assign greater one back to the peak variable. If we did nothing else, the peak variable would hold maximum value of the input variable for the time ever since we have created the new peak variable. To avoid this, we reset its value on Monday morning, which makes it track peak value of the input for the calendar week.

Actually we calculate two weekly peak values, one for the input interface utilization and the other for the output utilization. The first variable name is ifInRateWeeklyPeak, it is calculated using data from the variable ifInRate. The pair of variables for the output utilization is ‘ifOutRate’ and ‘ifOutRateWeeklyPeak’.

The calculation is done by the function calculate_weekly_peak_var() that takes three arguments:

  • input_var_name : the name of the input variable, e.g. ifInRate
  • peak_var_name : the name to use for the weekly peak variable, e.g. ifInRateWeeklyPeak
  • filter_tags : (list of strings) the tags to be used as a filter to pick specific instances of the input variable to use.

Here is the code:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import datetime
import nw2rules
from nw2functions import *


class UserRules(nw2rules.Nw2Rules):
    def __init__(self, log):
        super(UserRules, self).__init__(log)

def calculate_long_time_range_aggregate(self, input_var_name, avg_var_name, filter_tags, aggr_func):
    """Calculate and export variable that tracks some aggregate value over long time interval

    :param input_var_name: name of the input variable to aggregate
    :param avg_var_name:   the name of the variable that the result should be stored in
    :param filter_tags:    a list of tags to filter/match data to aggregate
    :param aggr_func:      a function that calculate aggregate value
    """
    input_var_gen = filter_by_tags(import_var(input_var_name), filter_tags)
    avg_var_gen = filter_by_tags(import_var(avg_var_name), filter_tags)
    # self.log.info('Filtered %s vars to calculate EMA value from: %s %s' %
    #               (len(input_var), input_var_name, filter_tags))
    # self.log.info('Filtered %s EMA value vars from: %s %s' %
    #               (len(avg_var), avg_var_name, filter_tags))

    for input_mvar, aggr_mvar in left_join(input_var_gen, avg_var_gen):
        current_value = input_mvar.timeseries.getLastValue()

        if aggr_mvar is None:
            # we don't have this variable yet
            current_aggr = float('nan')
            aggr_mvar = copy(input_mvar)
            aggr_value = current_value
        else:
            current_aggr = aggr_mvar.timeseries.getLastNonNaNValue()
            aggr_value = aggr_func(input_mvar, aggr_mvar)

        aggr_mvar.timeseries.clear()
        aggr_mvar.timeseries.put(current_timestamp(), aggr_value)
        self.log.info('Updating variable %s.%s.%s : current_value=%.1f current_aggr=%.1f aggr_value=%.1f' %
                      (avg_var_name, input_mvar.ds.getNode().getId(), input_mvar.ds.getIndex(),
                       current_value, current_aggr, aggr_value))
        export_var(avg_var_name, [aggr_mvar])

def weekly_peak(self, input_mvar, aggr_mvar):
    """
    Calculates max() of the two values, except at 00:00 on Monday when it returns current value

    :param input_mvar:  (MonitoringVariable instance) input variable
    :param aggr_mvar:   (MonitoringVariable instance) aggregate variable
    :return:               calculated average
    """
    assert isinstance(input_mvar, MonitoringVariable)
    assert isinstance(aggr_mvar, MonitoringVariable)
    if input_mvar.timeseries:
        current_value = max(input_mvar.timeseries).value
        current_aggr = aggr_mvar.timeseries.getLastNonNaNValue()
        if self.is_monday_morning():
            return input_mvar.timeseries.getLastNonNaNValue()
        else:
            return max(current_value, current_aggr)
    else:
        return float('nan')

def is_monday_morning(self):
    """
    Return true if current time is Monday 00:00 UTC
    """
    now = datetime.datetime.utcnow().utctimetuple()
    return now.tm_wday == 0 and now.tm_hour == 0 and now.tm_min == 0

def calculate_weekly_peak_var(self, input_var_name, aggr_var_name, filter_tags):
    """Calculate and export variable that tracks peak value for the current week.

    :param input_var_name:  name of the input variable to aggregate
    :param aggr_var_name:   the name of the variable that the result should be stored in
    :param filter_tags:  a list of tags to filter/match data to aggregate
    """
    self.calculate_long_time_range_aggregate(
        input_var_name, aggr_var_name, filter_tags, self.weekly_peak)


def execute(self):

    super(UserRules, self).execute()

    self.calculate_weekly_peak_var('ifInRate', 'ifInRateWeeklyPeak', ['ifRole.eBgpPeer', 'ifBGP4Peer.AS174'])
    self.calculate_weekly_peak_var('ifOutRate', 'ifOutRateWeeklyPeak', ['ifRole.eBgpPeer', 'ifBGP4Peer.AS174'])

The “heavy lifting” is done by function calculate_long_time_range_aggregate():

  • we use input_var = filter_by_tags(import_var(input_var_name), filter_tags) to filter input variable instances and variables that hold peak values by provided tags. Function nw2functions.filter_by_tags() returns generator.
  • We use library function nw2functions.left_join() to match input variables and variables where we store peak values by device and component. Function nw2functions.left_join() returns tuples of MonitoringVariable objects but if it can’t find matching item in the list passed as second argument, it returns tuple where second item is None.
  • There are two possible cases here: 1) we already have the peak variable or 2) we don’t have it yet. To check if the peak variable exists, we check the second item in the tuple returned by nw2functions.left_join(). If it is None, we don’t have corresponding peak variable yet, so we create it as a copy of the input variable and initialize its value using current value of the input variable.
  • To calculate aggregate value, we use function passed to calculate_long_time_range_aggregate() as last argument. This is expected to be a function of two arguments: the input variable instance and aggregate variable instance (in this case, peak).
  • function weekly_peak() returns max() of its two arguments, except at 00:00 on Mondays when it returns current value of the input variable (its first argument). This provides for reset of the weekly peak on Monday morning.

To make new variables appear in the GW, add the following to the bottom of your nw2.conf file:

1
2
3
4
5
variables.ifInRateWeeklyPeak = ${variables.ifInRate}
variables.ifOutRateWeeklyPeak = ${variables.ifOutRate}

graphingWorkbench.variables += ifInRateWeeklyPeak
graphingWorkbench.variables += ifOutRateWeeklyPeak

8.5.12. Calculating long term average values

Another interesting example is the task of calculating long term average value of a variable for the time intervals longer than what NetSpyGlass keeps in memory buffer. This is similar to the example with weekly peak value (see above) except we want to calculate average value instead of the peak. We approximate it without having direct access to the full month worth of data using Exponential Moving Average algorithm (see http://en.wikipedia.org/wiki/Moving_average , section “Exponential moving average”). In the example below function calculate_ema() calls function calculate_long_time_range_aggregate() that was used and described in the previous example:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def calculate_ema(self, input_var_name, aggr_var_name, filter_tags, n_periods):
    """Calculate and export variable that tracks average value using exponential moving average (EMA)

    :param input_var_name:  name of the input variable to aggregate
    :param aggr_var_name:   the name of the variable that the result should be stored in
    :param filter_tags:  a list of tags to filter/match data to aggregate
    :param n_periods:    number of periods (observations) used to calculate smoothing factor for EMA
    """
    self.calculate_long_time_range_aggregate(
        input_var_name, aggr_var_name, filter_tags, lambda cv, av: self.ema(n_periods, cv, av))

def ema(self, n_periods, input_mvar, aggr_mvar):
    """
    See  http://en.wikipedia.org/wiki/Moving_average  Exponential moving average

      workingAverage = (newValue*smoothingFactor) + ( workingAverage * ( 1.0 - smoothingFactor) )
      smoothingFactor = 2 / (1 + n_periods)

    :param n_periods:   number of periods (observations) used to calculate smoothing factor for EMA
    :param input_mvar:  (MonitoringVariable instance) input variable
    :param aggr_mvar:   (MonitoringVariable instance) aggregate variable
    :return:            calculated average
    """
    assert isinstance(input_mvar, MonitoringVariable)
    assert isinstance(aggr_mvar, MonitoringVariable)
    if input_mvar.timeseries:
        current_value = input_mvar.timeseries.getLastValue()
        current_aggr = aggr_mvar.timeseries.getLastNonNaNValue()
        smoothing_factor = 2.0 / (1.0 + n_periods)
        return current_value * smoothing_factor + current_aggr * (1.0 - smoothing_factor)
    else:
        return float('nan')

Function calculate_ema() takes the following arguments:

  • input_var_name : the name of the input variable, e.g. ifInRate
  • avg_var_name : the name to use for the average variable, e.g. ifInRateWeeklyAvg
  • filter_tags : (list of strings) the tags to be used as a filter to pick specific instances of the input variable to use.
  • n_periods : number of observations used to calculate EMA

According to the algorithm, to get weekly moving average n_periods should be equal to 7 * 24 * 3600 / polling_interval, where polling interval is in seconds. NetSpyGlass Python library nw2functions provides function nw2functions.polling_interval() that returns polling interval in seconds and can be used here. The call to calculate_ema() might look like this:

1
2
3
4
5
6
7
8
def execute(self):

    super(UserRules, self).execute()

    # 7 day average
    n_periods = 7 * 24 * 3600 / polling_interval()
    self.calculate_ema('ifInRate',  'ifInRateAvg',  ['ifRole.eBgpPeer', 'ifBGP4Peer.AS174'], n_periods)
    self.calculate_ema('ifOutRate', 'ifOutRateAvg', ['ifRole.eBgpPeer', 'ifBGP4Peer.AS174'], n_periods)

To make new variables appear in the GW, add the following to the bottom of your nw2.conf file:

1
2
3
4
5
variables.ifInRateAvg = ${variables.ifInRate}
variables.ifOutRateAvg = ${variables.ifOutRate}

graphingWorkbench.variables += ifInRateAvg
graphingWorkbench.variables += ifOutRateAvg

8.5.13. Operations with string variables

Beginning with v1.0.6, NetSpyGlass supports monitoring variables that have string values. One variable of this type created by default is ifAlias, it is used to read interface descriptions from the device. Corresponding OID (IF-MIB::ifAlias) is polled on every polling cycle, which means information about interface descriptions is always fresh in NetSpyGlass and does not depend on network discovery anymore.

Note

In fact, the value of interface descriptions in the NetSpyGlass monitoring variables is two polling cycles behind, just like the value of all other variables.

In the following example we use Python data processing script to read the latest value of interface descriptions from variable ifAlias and set field description in all other interface-related variables. This makes descriptions in various parts of the UI follow those set on the device:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
class Nw2Rules(object):
    def __init__(self, log):
        self.log = log

    def copy_interface_description(self, out_var_name):
        for var1, var2 in join(import_var('ifAlias'), import_var(out_var_name)):
            intf_descr = var1.timeseries.getLastValueAsString()
            var2.ds.setDescription(intf_descr)
            export_var(out_var_name, [var2])

    def execute(self):
        self.log.debug('===  Copy interface description from ifAlias')

        self.copy_interface_description('ifOperStatus')
        self.copy_interface_description('ifHCInOctets')
        self.copy_interface_description('ifHCOutOctets')
        self.copy_interface_description('ifInErrors')
        self.copy_interface_description('ifOutErrors')
        self.copy_interface_description('ifInDiscards')
        self.copy_interface_description('ifOutDiscards')
        self.copy_interface_description('ifSpeed')

Function copy_interface_description() matches instances of the variable ifAlias the instances of another variable, name of which is passed as an argument, using call to nw2functions.join() (nw2functions.join() matches variable instances by device id and component index). The value taken from ifAlias is then copied to the field description in the net.happygears.nw2.py.DataSource. This makes updated description appear everywhere in the UI and reports.

The code in this example is part of the standard nw2rules.py module.

Note

Use call to net.happygears.nw2.time_series_buffer.TimeSeriesBuffer.getLastValueAsString() to get the value as a string. If variable holds numeric values, this method returns string representation of the last value, that is, it works for both numeric and string variables but always returns a string.

8.5.14. Calculating total monthly traffic value (data cap)

In this example we calculate monthly amount of traffic that crosses an interface, in bytes. This can be used to keep track on the traffic volume on the Internet connection with data cap. The code used in this example can be generalized to perform any calculation using persistent “accumulator” variable.

This script relies on the function nw2functions.get_or_create() from module nw2functions that takes the following arguments:

  • variable name
  • pseudo-device name this variable should be associated with
  • pseudo-component name it should be associated with
  • (optional) description
  • (optional) initial value

First, this function tries to find device with given name. If device exists, it tries to find interface or hardware component with component name passed as third argument. If such interface or component exists as well, the function then tries to locate monitoring variable identified by the triplet constructed from variable name, device id and component index. If this variable exists, it is returned. Note that in this case returned variable is returned with its current time series, so that its current value can be used for calculations.

If device or component or the variable do not exist, this function creates one. If it needs to create device or component, it generates unique ID for it automatically. This function takes an optional argument initial_value, which can be used to initialize time series of the created variable.

The script shown in this example does not use this parameter and instead, it calls nw2functions.query_tsdb() in an attempt to fill time series of the variable with data from the Time Series Database.

Call to nw2functions.filter_by_tags() filters input variable ifInRate by tag, picking only instances with tag Link.MyIsp in this example. This tag is just an example, in real script it can be something like Link.1.2.3.4 where “1.2.3.4” is an IP address of the gateway on ISP side, or BGP4Peering.AS1000 if BGP peering with the ISP could be discovered. In the end, the filtering picks up monitoring variable associated with “right” interface. Function calculate_monthly_traffic() takes the following arguments:

  • input variable (a generator of MonitoringVariable instances)
  • aggregate variable (single MonitoringVariable instance, this is our accumulator variable)

The idea is to call standard Python function reduce() to add latest value of input variables multiplied by the polling interval (in sec) to the current value of the aggregate variable. Since the input ifInRate describes traffic level through interface in bits/sec, the result of this calculation is total amount of traffic through interface in bits ever since the variable has been created and initialized with “0.0”. To calculate monthly data cap values, calculate_monthly_traffic() resets the value of the aggregate variable once a month. Function beginning_of_month() (not shown here) returns True if time stamps of two latest observations in the time series of the monitoring variables correspond to different months. To get traffic level in bytes, the result is divided by 8 in the call div(import_var(‘ifInMonthlyTrafficBit’), 8).

If input is a generator that yields multiple monitoring variable instances, their values will be added together. This can be useful if we have multiple connections to MyIsp and want to calculate total amount of traffic sent to or from this Isp through all connections. If we needed to calculate the sum for each connection separately, the input filter would need to be more selective.

Note

Call to reduce() works because class MonitoringVariable has Python “magic” functions that allow us to write, in Python, a + b where a and b are MonitoringVariable objects. This adds last values from their time series and stores result in a.

Code:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
class Nw2Rules(object):
    def __init__(self, log):
        self.log = log

    def calculate_monthly_traffic(self, input_var, aggregate_mvar):
        """
        Takes values of the latest observations from variables in input_var and if they are not NaN,
        multiplies it by polling interval and adds the result to the value of the last observation in
        time series of the accumulator variable `aggregate_mvar`

        The output variable value is reset on the first minute of the first day
        of each month.

        This basically integrates values of the input and stores result in the output.
        At any given time, the latest value of the output variable is equal to the accumulated
        integrated value of the input from the beginning of the month up to that moment.

        @param input_var:      input variable (iterable or generator)
        @param aggregate_mvar: aggregate variable
        @return: MonitoringVariable instance with the result
        """
        assert isinstance(aggregate_mvar, MonitoringVariable)

        input_var_list = list(skip_nans(input_var))
        if not input_var_list:
            return aggregate_mvar

        agg_val = aggregate_mvar.timeseries.getLastNonNaNValue()
        if math.isnan(agg_val) or self.beginning_of_month(input_var_list[0]):
            agg_val = 0.0

        # Add new observation with the same value as the currently last one but new
        # time stamp - reduce() will use this as starting value
        aggregate_mvar.timeseries.put(current_timestamp(), agg_val)

        # add up values of latest observations in monitoring variable instances
        # input_var_list and aggregation variable itself. Note that if `input_var`
        # yields multiple monitoring variables, then the call to :func:`reduce()`
        # adds values from all of them to the aggregate.
        #
        return reduce(lambda x, y: x + y * polling_interval(), input_var_list, aggregate_mvar)

    def get_and_fill(self, name, devname, component):
        """
        Call :finc:`get_or_create()` to try to find variable in the data pool. If it does not
        exist, create it. In either case, check if its timeseries buffer is empty. If it is
        empty, try to load latest 24hr of data from TSDB to initialize it. If that fails, initialize
        time series buffer with "0.0"
        """
        traffic_mvar = get_or_create(name, devname, component, '', None)
        assert isinstance(traffic_mvar, MonitoringVariable)
        if not traffic_mvar.timeseries:
            # get latest 24hr of data from tsdb
            triplet = '{0}.{1}.{2}'.format(name, traffic_mvar.ds.deviceId, traffic_mvar.ds.index)
            observations = query_tsdb(triplet, time.time() - 24*3600, 24*3600 / polling_interval())
            if observations:
                # I only need the last observation but should store all of them in case the last one is NaN
                traffic_mvar.timeseries.addAll(observations)
            else:
                # there was nothing in the database
                traffic_mvar.timeseries.put(current_timestamp(), 0.0)
        return traffic_mvar

    def execute(self):
        super(MyRules, self).execute()

        traffic_mvar = self.get_and_fill('ifInMonthlyTrafficBit', 'EthericNetworks', 'traffic')
        traffic_mvar = self.calculate_monthly_traffic(filter_by_tags(import_var('ifInRate'), ['Link.MyIsp:']), traffic_mvar)
        export_var('ifInMonthlyTrafficBit', [traffic_mvar])

        # convert total accumulated bits to bytes
        export_var('ifInMonthlyTrafficByte', div(import_var('ifInMonthlyTrafficBit'), 8))

8.5.15. Difference between functions new_var() and get_or_create()

We used both functions in the examples in this chapter, but subtle but important difference exists between them. Both return monitoring variable object associated with device and component with names passed as arguments. Both functions try to find device by name first. If device is found, then they try to find component by name. This can be an interface, hardware component, firewall counter, a tunnel, and so on. If the component is found, the function tries to find monitoring variable with given name that tracks this device and component. If any of these steps was unsuccessful, that is, device or component or the variable do not exist, then one will be created. If we need to create the device or component, we generate unique index for it. If the function creates device, it adds special role to it to mark it as “ephemeral” because it does not correspond to any “real” device. Even though it has a role like this, it still appears Graphing Workbench and can be used to build graphs or alerts. This device is not saved to the database and does not appear in maps though.

The difference between nw2functions.new_var() and nw2functions.get_or_create() is in what do they do after they located device and component. Function nw2functions.get_or_create() tries to find existing variable and returns it if it exists. This allows the caller to use its current value. If the variable does not exist, it is created and initialized (if argument initial_value has been provided in the call to nw2functions.get_or_create()). Function nw2functions.new_var() always returns temporary monitoring variable object with empty time series.

We recommend you to always use nw2functions.get_or_create() instead of nw2functions.new_var(). If your calculation does not require current value of the variable, simply ignore it and append new observation to its time series.