.. _rules: Data Processing Rules ********************* .. highlight:: python :linenothreshold: 2 Summary ======= NetSpyGlass comes with embedded Python interpreter that is used to process collected monitoring data. At the end of each polling cycle, the program loads and runs Python class defined by the configuration parameter "monitor.rules". It operates on lists of monitoring variables and can modify them, create new variables, create log records, add or modify tags on monitoring variables and trigger alerts. Configuration ============= Configuration parameter `monitor.rules` is a dot-separated path of Python class used to process monitoring data. The path should include directories and module name. The system assumes path starts in the same directory where configuration file is located. For example:: monitor.rules = "rules.Rules" In this case interpreter loads class Rules from file `rules.py` that is expected to be located in the same directory where configuration file `nw2.conf` is. Default configuration loads class `Nw2Rules` from module `nw2rules.py` that ships as part of the internal Java resources. You can find a copy of this module in the directory `doc` in the distribution tar archive. This copy is not actually used by the program, it is provided so you can inspect it and consult its source code when you extend and modify data processing rules. .. note:: The script should be written assuming particular monitoring variables may not exist when it runs. Presence of certain monitoring variable or certain instance depends on the state of the network, because variables are created using information we collect during discovery. For example, parts of the script may try to process variable `tempSensor`, however if the script runs on the network built with devices that have no sensors, corresponding monitoring variable will be empty (that is, has no instances). .. py:class:: Nw2Rules(log) Python class :class:`Nw2Rules` is used to process data NetSpyGlass collects when it polls devices. NetSpyGlass calls function :meth:`execute()` of an instance of this class at the completion of each monitoring cycle. Note that new instance of this class is created every time it is used to process collected data. :param log: Java logger. To add log record, call it like this: `self.log.info("Log record")` .. py:method:: execute() Java environment calls this function to execute data processing rules. To modify default data processing rules, you need to create your own Python class that derives from :class:`nw2rules.Nw2Rules`. Here is an example:: import nw2rules class UserRules(nw2rules.Nw2Rules): def __init__(self, log): super(UserRules, self).__init__(log) def execute(self): self.log.info("#### UserRules.execute() starts") super(UserRules, self).execute() Assuming this class is saved in file `rules.py`, configuration parameter `monitor.rules` should look like this:: monitor.rules = "rules.UserRules" Function :meth:`execute()` in this class should call the same function in the base class and then can perform its own calculation. See below for more information on what it can do. .. note:: You can turn rule processing off completely by using string ``none`` as a value of the parameter `network.monitor.rules`. This can be useful when NetSpyGlass runs in a cluster configuration where all computations are performed by a dedicated "compute" secondary server. Example:: monitor.rules = "none" How to access monitoring variables ================================== At the end of each polling cycle NetSpyGlass loads the module and creates an instance of the class defined by the `module.rules` parameter and calls its function :func:`execute()` to process collected data. To access monitoring variable, this function should call :func:`nw2functions.import_var` defined in the module :mod:`nw2functions`:: import nw2rules from nw2functions import * class UserRules(nw2rules.Nw2Rules): def __init__(self, log): super(UserRules, self).__init__(log) def execute(self): super(UserRules, self).execute() in_octets = import_var('ifHCInOctets') Call to :func:`import_var` returns a generator of :class:`MonitoringVariable` objects. Each object collects monitoring data for particular variable of one network interface or hardware component of one device. This can be inbound interface utilization, outbound interface errors, CPU utilization, reading of particular temperature sensor etc. Information collected by one instance of monitoring variable is obtained by making SNMP request with one OID and the name of the monitoring variable usually matches the name of the OID. This is true for the variables that collect "raw" or "original" data as it comes from the device. Very often this data must be processed to make it usable, for example, interface utilization comes in the form of a byte counter, which needs to be converted to the rate in bits/sec for representation in the system. This is done in the Python script by calling built-in functions to compute the rate and multiply it by 8. Result of this calculation can be stored in a new variable named ifInRate , this can be done using the following lines of Python code:: in_octets = import_var('ifHCInOctets') in_rate = mul(rate(in_octets), 8) export_var('ifInRate', in_rate) In this example, `in_octets` is a generator that yields objects of class :class:`MonitoringVariable` that hold values of the corresponding counters collected from all interfaces of all devices by snmp collector. This generator can produce thousands of items, where each item corresponds to one interface of one device. Here we create new Python variable :py:obj:`in_rate` which is also a generator of :class:`MonitoringVariable` objects. To make them available to NetSpyGlass so they can appear in the UI and stored in the database, you need to "export" them by calling :func:`export_var`. How does this work ================== Function :func:`nw2functions.import_var()` returns generator that yields copies of :class:`net.happygears.nw2.py.MonitoringVariable` objects stored in the internal data pool. Copies are returned in order to make it safer and easier to manipulate their time series. If the call to :func:`nw2functions.import_var()` returned original objects stored in the internal data pool, all functions that operate on their time series would need to be careful to only modify the last observation but not touch all other because these observations are used for graphing, alerts and reports by the server and an error in the Python data processing rules script could easily cause irreversible data loss. Returning copies and them merging them back into the data pool when script calls :func:`nw2functions.export_var()` makes this safer. In order to reduce total number of large objects created and destroyed while Python data processing script runs, copies returned by :func:`nw2functions.import_var()` are taken from a separate pool of :class:`net.happygears.nw2.py.MonitoringVariable` objects so they can be reused. These objects are called ``anonymous`` monitoring variables, the size of this shared pool is monitored using variable `numAnonVars` in category `Monitor`. Using pooled anonymous variables greatly reduces number of objects that need to be created and then destroyed on every run of the data processing script. Consider NetSpyGlass server that monitors 1000 devices and total 100,000 interfaces. In order to calculate inbound interface utilization for each interface, it executes the following code:: if_hc_in_octets = import_var('ifHCInOctets') export_var('ifInRate', mul(rate(if_hc_in_octets), 8)) Call to :func:`import_var()` returns 100,000 objects (via generator). These objects are manipulated by :func:`rate()` and :func:`mul()` and finally returned back to the system by :func:`export_var()`. Functions :func:`rate()` and :func:`mul()` do not create new objects, they modify those passed to them and return the same ones. Function :func:`export_var()` takes last observation from the time series buffer of the input variables and merges it into time series buffer of the "actual" monitoring variable objects in the main data pool in the server. This makes calculated value available for graphing, reports etc. The same process has to be repeated for all other monitoring variables that describe some metrics related to interfaces. If we have 10 variables like that, every run of the script will use 1 million of nonymous variables. Pooling these objects allows us to avoid creating and destroying a million of objects on every script run. In reality the number is greater because total number of interface-related monitoring variables is greater and the same thing happens with variables that monitor parameters of hardware components, protocols etc. To effectively reuse shared anonymous variables, they need to be returned to the pool as soon as the script is done using them. This is not an easy task though. The easiest approach is to collect all allocated anonymous variables at the end of the run of the data processing script. This is not very efficient though because it means each call to :func:`import_var()` takes new set of anonymous variables from the pool, even if they are the same as those already taken before. Also, code sequences as shown above form "chains": call to :func:`import_var()` only returns generator, it does not really perform any calculations or data manilupations. It does not take anonymous variables from the pool either, yet. What happens is that function :func:`export_var()` starts calling the chain of generators ``mul -> rate -> import_var`` which takes one anonymous object from the pool, fills it with data, calculates rate, then multiplies and finally merges result back into the main data pool. At this point :func:`export_var()` releases anonymous variable and it is returned back into the pool of anonymous variables so that next iteration done by :func:`import_var()` can take the same object from the pool. Now, instead of creating 100,000 anonymous variables we only need one. This dramatically reduces total number of objects that exist in the system at the same time, reducing total required memory footprint of the server. However this creates certain caveats. For example, dong this would not work:: if_hc_in_octets = list(import_var('ifHCInOctets')) export_var('ifInRate', mul(rate(if_hc_in_octets), 8)) # do some other calculations with if_hc_in_octets here Normally, generator returned by the call to :func:`import_var()` can only be used once. All objects that it yields are consumed by the call to :func:`rate()` and after that, passing generator object to other functions would not work. It would seem wrapping it in `list()` is a solution - `list` comsumes all objects that the generator can produce and holds references to them in a Python list. However, since call to :func:`export_var()` returns anonymous variables to the pool, the items in that list can not be used again because they have been recycled and are likely to hold data from some other "original" monitoring variables after the call to :func:`export_var()`. Another problem with wrapping result of :func:`import_var()` in a list is that it defeats the purpose of returning anonymous variables back to the pool as soon as possible. As was shown above, normally this code sequence needs only one anonymous variable object, however if we use the list, it is going to hold all 100,000 objects at some time and we need 100,000 anonymous variable objects. All these anonymous variables will be recycled at the end of the run of the data processing script, so it does not leak them, but the server is going to have to hold them all in memory for some time. .. important:: Do not wrap result of the call to :func:`import_var()` in a Python list to use the same anonymous variables more than once. Instances in this list become invalid and operations with them may yield unexpected and unpredictable results if they are used after the call to :func:`export_var()` and some other functions from module :mod:`nw2functions` Call to :func:`import_var()` is cheap, it only creates iterator. Filling returned anonymous variables with data is also very efficient and does not take whole lot of time. Holding on to the prepared anonymous variables does not speed script execution and is unnecessary. Just call :func:`import_var()` again if you need to perform different operations with the same monitoring variables. Another interesting case with recycling of anonymous variables is related to filtering functions such as :func:`nw2functions.filter_by_tags()` or :func:`nw2functions.skip_nans()`. Consider the following code snippet (taken from one of the examples below in this section):: if_out_rate = filter_by_tags(import_var('ifOutRate'), ['ifBGP4Peer.AS174', '!VariableTags.Aggregate']) aggr = new_var('Cogent', 'peering') aggregate(aggr, skip_nans(if_out_rate)) aggr.addTag('VariableTags.Aggregate') export_var('ifOutRate', aggr) here we import variable `ifOutRate` but only want instances that have tag 'ifBGP4Peer.AS174' and not 'VariableTags.Aggregate'. In other words, out of 100,000 instances of the variable `ifOutRate` we want to do something with only a few. Call to :func:`import_var()` will still use 100,000 anonymous variables (sequentially, not at the same time) but call to :func:`filter_by_tags()` will pass through only a few. What happens to those that it does not pass through? If nothing, then they would linger in the state "checked out from the pool but not returned" until the end of the run of data processing script and we are back to the situation when we need 100,000 anonymous variables. To avoid this problem, function :func:`nw2functions.filter_by_tags()` actually recycles variables it does not pass through. The same happens with function :func:`nw2functions.skip_nans()` that skips monitoring variables where last value in the time series buffer is a `NaN`. This function also recycles variables it does not pass through. Examples ======== Converting interface counters to interface utilization ------------------------------------------------------ IF-MIB OIDs `ifHCInOctets` and `ifHCOutOctets` return value of the internal 64-bit byte counters, but what we really need for the display of monitoring data is interface utilization in bits/sec. To convert, we need to calculate rate and multiply it by 8. We are going to smooth the data a little bit by computing sliding window median over 4 data points:: ifHCInOctets = import_var('ifHCInOctets') ifInRate = mul(median(rate(ifHCInOctets, 4)), 8) export_var('ifInRate', ifInRate) ifHCOutOctets = import_var('ifHCOutOctets') ifOutRate = mul(median(rate(ifHCOutOctets, 4)), 8) export_var('ifOutRate', ifOutRate) in this example rate() processes only 4 last observations in the time series passed to it as input. Function :func:`nw2functions.rate()` is not going to copy full time series from the input if it only needs a few observations. Call to :func:`nw2functions.rate()` returns new :class:`MonitoringVariable` objects with time series that consist at most of 4 observations. These objects are then fed to the input of :func:`nw2functions.median()` which computes median value of these observations and returns :class:`MonitoringVariable` object with time series that has only one observation. This object is then exported back to the NetSpyGlass data processor which is going to merge this object with its own copy of the :class:`MonitoringVariable` object for the same device and component. Calculate interface utilization as percentage of speed ------------------------------------------------------ To do this, use variable ifSpeed` to get interface speed and call :func:`nw2functions.div()` to divide and :func:`nw2functions.mul()` to multiply by 100 to get percentages:: ifSpeed = import_var('ifSpeed') ifInRatePercent = mul(div(ifInRate, ifSpeed) * 100.0) max() ----- Here is how we can use Python built-in function `max()` to find hardware component with highest temperature across all components of all devices we monitor:: hottestComponent = max(import_var('tempSensor')) `tempSensor` is a list of monitoring variables that poll temperature sensors discovered on the devices the system monitors. Each item in this list corresponds to one sensor on one device. Since :class:`MonitoringVariable` provides function :attr:`__cmp__()`, instances of this class can be compared and a call to :func:`max()` returns instance with highest temperature. Attributes :attr:`device` and :attr:`ds` can be used to retrieve information about the device and hardware component:: log.info("Hottest component: " + hottestComponent.device + " " + hottestComponent.ds.descr + ": " + str(hottestComponent.timeseries.last().value) + " C") group_by_device() ----------------- To find hardware component with highest temperature for each device defined in the system, we could use the following Python code:: hottest_per_device = (max(x) for x in group_by_device(import_var('tempSensor')) if x is not None) def printComponentTemp(var): if var.timeseries: log.info(" {0:25} | {1:40} | {2:4} {3}".format( var.device, var.ds.descr, var.timeseries[-1].value, var.ds.unit)) map(printComponentTemp, hottest_per_device) Call to :func:`nw2functions.group_by_device()` groups by device and returns a list of lists of :class:`MonitoringVariable` objects; calling `max()` on each list created by :func:`nw2functions.group_by_device()` quickly finds maximum value for each device. After this, list `hottest_per_device` has monitoring variable instance for each device for which we monitor at least one temperature sensor via variable `tempSensor`. Since input to the call to :func:`max()` is a list of :class:`MonitoringVariable` objects, the result is an instance of the same class, which means it retains reference to the :class:`DataSource` object which tells us which device and component it describes. `hottest_per_device` is a list of variables like that. Built-in Python function :func:`max()` uses "magic" function :func:`MonitoringVariable.__cmp__()` that compares one variable instance with another. This function compares the value of the last data point in the time series of each instance. Function :func:`max()` returns variable instance which has greater last value. Since it returns reference to the unmodified original monitoring variable instance, this instance retains its full time series. Finally, this example calls Python built-in `map()` to print each member of the list `hottest_per_device` in a neatly formatted table. Filtering by tags ----------------- Suppose we want calculate total volume of traffic we are sending towards AS1000, but out network peers with AS1000 in multiple points, so we have to add up values of the traffic levels going across several interfaces of different devices. We can use tag matching to find interfaces: each peering interface connected to AS1000 is going to have tag "BGP4Peering.AS1000". The following call to Python built-in function :func:`filter()` picks up right instances of the variable `ifOutRate` :: ifOutRateAS1000 = filter(lambda var: 'BGP4Peer.AS1000' in var.tags, ifOutRate) List `ifOutRateAS1000` created this way has only components that correspond to interfaces that peer with AS1000. Now, to calculate aggregate we can call `sum()`:: aggregateRateToAS1000 = [sum(ifOutRateAS1000)] Do not forget to wrap variable returned by :func:`sum()` in list "[ ]" because NetSpyGlass expects all monitoring variables to be lists or generators but :func:`sum()` returns a scalar. Python function :func:`sum()` relies on the "magic" function :func:`MonitoringVariable.__add__()` that can add MonitoringVariable instance and a constant or two MonitoringVariable instances. Function :func:`__add__()` returns new copy of :class:`MonitoringVariable` object to follow standard Python semantics of :func:`__add__()`. Print variable attributes to log -------------------------------- This can be useful for debugging:: def print_tags(var): log.info(var.device + ':' + var.component + ' ' + str(var.tags)) map(print_tags, self.vars['ifOperStatus']) Tag manipulation ---------------- Sometimes it can be useful to add or remove monitoring variables tags. For example, one rule in the rule processing script may add a tag to variables depending on their values, while another rule later in the script may do something depending on this tag. Tags added this way are not stored in the database, but since the program re-evaluates rules every 30 sec, the rule that adds the tag will keep adding it, making it work as if it was persistent. Here is an example how this could be done:: def func(var): ts = var.timeseries[-1].timestamp value = var.timeseries[-1].value if value > 2000: var.tags.add('ColorLevel.2') var.tags.add('Color.red') else: var.tags.add('ColorLevel.1') var.tags.add('Color.green') map(func, mv1) Standard data processing script also does this to convert operational status of interfaces ("up" or "down") to tags in facet `ifOperStatus`. These tags make it possible to filter interfaces by operational status in the device details panel using tag filter. Here is how this is done:: def setInterfaceOpStatusTag(self, var): """ Set tag to reflect interface oper status. Interfaces with status "up" get tag "ifOperStatus.Up", those in status "Down" get "ifOperStatus.Down". If variable has no time series data, skip it and do nothing. :param var MonitoringVariable instance """ if len(var.timeseries) > 1: if self.isInterfaceUp(var): var.tags.add("ifOperStatus.Up") else: var.tags.add("ifOperStatus.Down") def execute(self, vars): self.log.info("=== Set Tags") map(self.setInterfaceOpStatusTag, vars['ifOperStatus']) Calculating aggregates ---------------------- The following example computes sum of outbound traffic through interfaces of all devices that have tag 'ifBGP4Peer.AS174'. This tag is added automatically to all interfaces that carry BGP peering sessions with AS174 (COGENT). Calculated result is stored in the new monitoring variable instance that is added back to the same variable 'ifOutRate'. New instance has device name "Cogent" and component "peering". The code looks like this:: from nw2functions import * class UserRules(nw2rules.Nw2Rules): def __init__(self, log): super(UserRules, self).__init__(log) def calculate_aggregate(self): if_out_rate = filter_by_tags(import_var('ifOutRate'), ['ifBGP4Peer.AS174', '!VariableTags.Aggregate']) aggr = new_var('Cogent', 'peering') aggregate(aggr, if_out_rate) aggr.addTag('VariableTags.Aggregate') export_var('ifOutRate', aggr) Actual work is done by the function :func:`nw2functions.aggregate()` that adds values of the latest observations of variable instances passed as second argument and stores the result in the monitoring variable passed to it as first argument. To pick up only interfaces that carry traffic to AS174, we use filtering by tags:: filter_by_tags(import_var('ifOutRate'), ['ifBGP4Peer.AS174', '!VariableTags.Aggregate']) here we import variable "ifOutRate" and pass it to :func:`nw2functions.filter_by_tags()` that matches instances by tags passed as second argument. If tag is preceded with a "!", the function matches if this tag is not present in the variable. In this example we pick monitoring variable instances that have tag "ifBGP4Peer.AS174" and do not have tag "VariableTags.Aggregate". Matching on absence of "VariableTags.Aggregate" helps avoid adding calculated aggregate value to itself. Function :func:`nw2functions.filter_by_tags()` returns generator that yields matching monitoring variable instances. Function :func:`aggregate()` has a side effect in that it sets tags in the destination variable to the intersection of tag sets of all input monitoring variable instances. In the end, we add calculated aggregated instance to the same monitoring variable 'ifOutRate' by calling :func:`nw2functions.export_var()` with this name. We can distinguish this new instance by its device and component name ("Cogent" and "peering" respectively). The following example is in fact a copy of the internal implementation of :func:`nw2functions.aggregate()`, it illustrates how Python built-in function reduce() can be used to calculate aggregate:: def aggregate(aggr_var, mvlist): aggr_var.clearTags() # let MonitoringVariable.__add__() assign common set of tags to this reduce(lambda x, y: x + y, mvlist, aggr_var) return aggr_var We can use :func:`reduce()` because Java class MonitoringVariable has "magic" Python function :func:`__add__()` to add a constant or a value of another MonitoringVariable object. This function works with the latest observations in the time series of both objects and overwrites time series of the MonitoringVariable object it is called on, leaving it with time series of length 1. Function :func:`__add__()` has a side-effect in that it sets tags of the object it is called on to the intersection of sets of tags of it and the second MonitoringVariable object it operates on. Next example takes more direct approach and calculates aggregate value of the new variable without the aid of the function :func:`nw2functions.aggregate()`. Here, we calculate sum of values of variables tracking firewall counters with the same name configured on different routers:: from nw2functions import * COUNTER_NAME = 'discard-ge-0/0/0.0-i' TAG = 'CounterName.' + COUNTER_NAME class UserRules(nw2rules.Nw2Rules): def __init__(self, log): super(UserRules, self).__init__(log) def calculate_aggregate(self, aggr, var_name): ts = 0 new_value = 0.0 for fw_counter_var in filter_by_tags(import_var(var_name), [TAG, '!VariableTags.Aggregate']): copy_attrs(aggr, fw_counter_var) new_value += fw_counter_var.timeseries.getLastValue() ts = fw_counter_var.timeseries.getLastTimeStamp() aggr.timeseries.put(ts, new_value) def execute(self): super(UserRules, self).execute() aggr = new_var('Attackers', 'Discarded_Combined') aggr.addTag('VariableTags.Aggregate') self.calculate_aggregate(aggr, 'fwCntrPacketRate') export_var('fwCntrPacketRate', [aggr]) aggr = new_var('Attackers', 'Discarded_Combined') self.calculate_aggregate(aggr, 'fwCntrByteRate') export_var('fwCntrByteRate', [aggr]) As before we create new variable by calling :func:`nw2functions.new_var()` and give it made-up device and component names. Then we import variable 'fwCntrPacketRate' or 'fwCntrByteRate' and iterate over its instances, looking for the one with component name matching my counter name. Then we take the last time stamp and value from the input variable and calculate the aggregate. Once this code is executed by the NetSpyGlass server, a row with device name "Attackers" and component "Discarded_Combined" should appear in the Graphing workbench as a member of monitoring variables "fwCntrPacketRate" and "fwCntrByteRate" Aggregates and NaN values ^^^^^^^^^^^^^^^^^^^^^^^^^ When function :func:`nw2functions.aggregate()` adds up values of monitoring variable instances to calculate the aggregate value, the result changes if the latest observation in one or more input variables has value `NaN`. Since the result of addition of a number and `NaN` is a `NaN`, calculated aggregate becomes `NaN` even if one of the input variables has this value. This happens for example when device is taken offline so NetSpyGlass can not poll it. In this case monitoring variables created for it still linger for some time until they expire but their values are `NaN`. If these variables are used to calculate some kind of aggregate value, the aggregate value becomes `NaN`, too. This can be counter-intuitive since we do not want the aggregate graph to break just because one device has been taken offline. To work around this issue, you can use function :func:`nw2functions.skip_nans()` to filter out variables with latest value `NaN`:: from nw2functions import * class UserRules(nw2rules.Nw2Rules): def __init__(self, log): super(UserRules, self).__init__(log) def calculate_aggregate(self): if_out_rate = filter_by_tags(import_var('ifOutRate'), ['ifBGP4Peer.AS174', '!VariableTags.Aggregate']) aggr = new_var('Cogent', 'peering') aggregate(aggr, skip_nans(if_out_rate)) # Call to skip_nans() skips variable instances with value NaN aggr.addTag('VariableTags.Aggregate') export_var('ifOutRate', aggr) Calculating average value using data that fits in the memory buffer ------------------------------------------------------------------- To calculate average value of the variable over the data kept in memory, we use access to the time series buffer of the variable like this:: for mvar in import_var('ifInRate'): avg = 0 cntr = 0 for observation in mvar.timeseries: if observation.isNaN(): continue avg += observation.value cntr += 1 if cntr == 0: avg = float('nan') else: avg /= cntr avg_mvar = copy(mvar) avg_mvar.timeseries.clear() avg_mvar.timeseries.put(current_timestamp(), avg) export_var('ifInRateAvg', avg_var) Note how we skip NaNs in the time series while calculating the average. Calculating 95 percentile ------------------------- 95 percentile value for interface utilization is used for the Internet billing and capacity planning, and is often requested metrics. NetSpyGlass Python library offers function :py:func:`nw2functions.percentile()` that can be used to do this:: def calculate_percentile(self, input_var_name, p_var_name, filter_tags, threshold_percent): for mvar in filter_by_tags(import_var(input_var_name), filter_tags): ts = current_timestamp() p_value = percentile(mvar, threshold_percent) p_mvar = copy(mvar) p_mvar.timeseries.clear() p_mvar.timeseries.put(ts, p_value) export_var(p_var_name, [p_mvar]) def execute(self): self.calculate_percentile('ifInRate', 'ifInRateP95', ['ifAdminStatus.Up'], 95) self.calculate_percentile('ifOutRate', 'ifOutRateP95', ['ifAdminStatus.Up'], 95) function :py:func:`nw2functions.percentile()` takes the following arguments: - `mvar` : input monitoring variable - `threshold_percent` : threshold as percentage of total number of observations in `mvar` Function :py:func:`nw2functions.percentile()` takes observations from the time series of its argument `mvar`, sorts them and returns the value of the observation that is nearest to the `threshold_percent` of observations (it uses "The Nearest Rank" method). Function :py:func:`calculate_percentile()` in the example above takes the following arguments: - `input_var_name` : the name of the input variable, e.g. `ifInRate` - `p_var_name` : the name to use for the percentile variable, e.g. `ifInRateP95` - `filter_tags` : (list of strings) the tags to be used as a filter to pick specific instances of the input variable to use. - `threshold_percent` : threshold as a percentile of total number of observations (e.g. 95 for 95% calculation) For each monitoring variable instance in `input_var`, function :py:func:`calculate_percentile()` calls :py:func:`nw2functions.percentile()` to calculate the N-th percentile value, then creates new monitoring variable with name `p_var_name` as a copy of the input variable and adds calculated value as a new observation to it. Finally, new variable is exported back to the data processor to be merged with its copy in the variable pool. Call :py:func:`calculate_percentile()` with different variable name and `threshold_percent` value to calculate 75% or 99% percentile. Calculating weekly peak values ------------------------------ The following example creates new monitoring variable with values equal to the weekly peak value of the corresponding input variable. The peak value is reset every Monday at 00:00 UTC and calculation starts over from the beginning. Difficulty with this calculation is that NetSpyGlass server usually does not have data in memory that would span whole week. Typical size of the memory buffer is measured in hours and can be between 1 hour (the default) and 1 day. We can't simply call :py:func:`max(mvar.timeseries)` to find observation with maximum value because this only works on the data in memory. Instead, we compare current value of the variable that holds the peak value with current value of the input variable and assign greater one back to the peak variable. If we did nothing else, the peak variable would hold maximum value of the input variable for the time ever since we have created the new peak variable. To avoid this, we reset its value on Monday morning, which makes it track peak value of the input for the calendar week. Actually we calculate two weekly peak values, one for the input interface utilization and the other for the output utilization. The first variable name is `ifInRateWeeklyPeak`, it is calculated using data from the variable `ifInRate`. The pair of variables for the output utilization is 'ifOutRate' and 'ifOutRateWeeklyPeak'. The calculation is done by the function :py:func:`calculate_weekly_peak_var()` that takes three arguments: - `input_var_name` : the name of the input variable, e.g. `ifInRate` - `peak_var_name` : the name to use for the weekly peak variable, e.g. `ifInRateWeeklyPeak` - `filter_tags` : (list of strings) the tags to be used as a filter to pick specific instances of the input variable to use. Here is the code:: import datetime import nw2rules from nw2functions import * class UserRules(nw2rules.Nw2Rules): def __init__(self, log): super(UserRules, self).__init__(log) def calculate_long_time_range_aggregate(self, input_var_name, avg_var_name, filter_tags, aggr_func): """Calculate and export variable that tracks some aggregate value over long time interval :param input_var_name: name of the input variable to aggregate :param avg_var_name: the name of the variable that the result should be stored in :param filter_tags: a list of tags to filter/match data to aggregate :param aggr_func: a function that calculate aggregate value """ input_var_gen = filter_by_tags(import_var(input_var_name), filter_tags) avg_var_gen = filter_by_tags(import_var(avg_var_name), filter_tags) # self.log.info('Filtered %s vars to calculate EMA value from: %s %s' % # (len(input_var), input_var_name, filter_tags)) # self.log.info('Filtered %s EMA value vars from: %s %s' % # (len(avg_var), avg_var_name, filter_tags)) for input_mvar, aggr_mvar in left_join(input_var_gen, avg_var_gen): current_value = input_mvar.timeseries.getLastValue() if aggr_mvar is None: # we don't have this variable yet current_aggr = float('nan') aggr_mvar = copy(input_mvar) aggr_value = current_value else: current_aggr = aggr_mvar.timeseries.getLastNonNaNValue() aggr_value = aggr_func(input_mvar, aggr_mvar) aggr_mvar.timeseries.clear() aggr_mvar.timeseries.put(current_timestamp(), aggr_value) self.log.info('Updating variable %s.%s.%s : current_value=%.1f current_aggr=%.1f aggr_value=%.1f' % (avg_var_name, input_mvar.ds.getNode().getId(), input_mvar.ds.getIndex(), current_value, current_aggr, aggr_value)) export_var(avg_var_name, [aggr_mvar]) def weekly_peak(self, input_mvar, aggr_mvar): """ Calculates max() of the two values, except at 00:00 on Monday when it returns current value :param input_mvar: (MonitoringVariable instance) input variable :param aggr_mvar: (MonitoringVariable instance) aggregate variable :return: calculated average """ assert isinstance(input_mvar, MonitoringVariable) assert isinstance(aggr_mvar, MonitoringVariable) if input_mvar.timeseries: current_value = max(input_mvar.timeseries).value current_aggr = aggr_mvar.timeseries.getLastNonNaNValue() if self.is_monday_morning(): return input_mvar.timeseries.getLastNonNaNValue() else: return max(current_value, current_aggr) else: return float('nan') def is_monday_morning(self): """ Return true if current time is Monday 00:00 UTC """ now = datetime.datetime.utcnow().utctimetuple() return now.tm_wday == 0 and now.tm_hour == 0 and now.tm_min == 0 def calculate_weekly_peak_var(self, input_var_name, aggr_var_name, filter_tags): """Calculate and export variable that tracks peak value for the current week. :param input_var_name: name of the input variable to aggregate :param aggr_var_name: the name of the variable that the result should be stored in :param filter_tags: a list of tags to filter/match data to aggregate """ self.calculate_long_time_range_aggregate( input_var_name, aggr_var_name, filter_tags, self.weekly_peak) def execute(self): super(UserRules, self).execute() self.calculate_weekly_peak_var('ifInRate', 'ifInRateWeeklyPeak', ['ifRole.eBgpPeer', 'ifBGP4Peer.AS174']) self.calculate_weekly_peak_var('ifOutRate', 'ifOutRateWeeklyPeak', ['ifRole.eBgpPeer', 'ifBGP4Peer.AS174']) The "heavy lifting" is done by function :py:func:`calculate_long_time_range_aggregate()`: - we use `input_var = filter_by_tags(import_var(input_var_name), filter_tags)` to filter input variable instances and variables that hold peak values by provided tags. Function :py:func:`nw2functions.filter_by_tags()` returns generator. - We use library function :py:func:`nw2functions.left_join()` to match input variables and variables where we store peak values by device and component. Function :py:func:`nw2functions.left_join()` returns tuples of `MonitoringVariable` objects but if it can't find matching item in the list passed as second argument, it returns tuple where second item is `None`. - There are two possible cases here: 1) we already have the peak variable or 2) we don't have it yet. To check if the peak variable exists, we check the second item in the tuple returned by :py:func:`nw2functions.left_join()`. If it is None, we don't have corresponding peak variable yet, so we create it as a copy of the input variable and initialize its value using current value of the input variable. - To calculate aggregate value, we use function passed to :py:func:`calculate_long_time_range_aggregate()` as last argument. This is expected to be a function of two arguments: the input variable instance and aggregate variable instance (in this case, peak). - function :py:func:`weekly_peak()` returns :py:func:`max()` of its two arguments, except at 00:00 on Mondays when it returns current value of the input variable (its first argument). This provides for reset of the weekly peak on Monday morning. To make new variables appear in the GW, add the following to the bottom of your nw2.conf file:: variables.ifInRateWeeklyPeak = ${variables.ifInRate} variables.ifOutRateWeeklyPeak = ${variables.ifOutRate} graphingWorkbench.variables += ifInRateWeeklyPeak graphingWorkbench.variables += ifOutRateWeeklyPeak Calculating long term average values ------------------------------------ Another interesting example is the task of calculating long term average value of a variable for the time intervals longer than what NetSpyGlass keeps in memory buffer. This is similar to the example with weekly peak value (see above) except we want to calculate average value instead of the peak. We approximate it without having direct access to the full month worth of data using Exponential Moving Average algorithm (see http://en.wikipedia.org/wiki/Moving_average , section "Exponential moving average"). In the example below function :py:func:`calculate_ema()` calls function :py:func:`calculate_long_time_range_aggregate()` that was used and described in the previous example:: def calculate_ema(self, input_var_name, aggr_var_name, filter_tags, n_periods): """Calculate and export variable that tracks average value using exponential moving average (EMA) :param input_var_name: name of the input variable to aggregate :param aggr_var_name: the name of the variable that the result should be stored in :param filter_tags: a list of tags to filter/match data to aggregate :param n_periods: number of periods (observations) used to calculate smoothing factor for EMA """ self.calculate_long_time_range_aggregate( input_var_name, aggr_var_name, filter_tags, lambda cv, av: self.ema(n_periods, cv, av)) def ema(self, n_periods, input_mvar, aggr_mvar): """ See http://en.wikipedia.org/wiki/Moving_average Exponential moving average workingAverage = (newValue*smoothingFactor) + ( workingAverage * ( 1.0 - smoothingFactor) ) smoothingFactor = 2 / (1 + n_periods) :param n_periods: number of periods (observations) used to calculate smoothing factor for EMA :param input_mvar: (MonitoringVariable instance) input variable :param aggr_mvar: (MonitoringVariable instance) aggregate variable :return: calculated average """ assert isinstance(input_mvar, MonitoringVariable) assert isinstance(aggr_mvar, MonitoringVariable) if input_mvar.timeseries: current_value = input_mvar.timeseries.getLastValue() current_aggr = aggr_mvar.timeseries.getLastNonNaNValue() smoothing_factor = 2.0 / (1.0 + n_periods) return current_value * smoothing_factor + current_aggr * (1.0 - smoothing_factor) else: return float('nan') Function :py:func:`calculate_ema()` takes the following arguments: - `input_var_name` : the name of the input variable, e.g. `ifInRate` - `avg_var_name` : the name to use for the average variable, e.g. `ifInRateWeeklyAvg` - `filter_tags` : (list of strings) the tags to be used as a filter to pick specific instances of the input variable to use. - `n_periods` : number of observations used to calculate EMA According to the algorithm, to get weekly moving average `n_periods` should be equal to `7 * 24 * 3600 / polling_interval`, where polling interval is in seconds. NetSpyGlass Python library `nw2functions` provides function :py:func:`nw2functions.polling_interval()` that returns polling interval in seconds and can be used here. The call to :py:func:`calculate_ema()` might look like this:: def execute(self): super(UserRules, self).execute() # 7 day average n_periods = 7 * 24 * 3600 / polling_interval() self.calculate_ema('ifInRate', 'ifInRateAvg', ['ifRole.eBgpPeer', 'ifBGP4Peer.AS174'], n_periods) self.calculate_ema('ifOutRate', 'ifOutRateAvg', ['ifRole.eBgpPeer', 'ifBGP4Peer.AS174'], n_periods) To make new variables appear in the GW, add the following to the bottom of your nw2.conf file:: variables.ifInRateAvg = ${variables.ifInRate} variables.ifOutRateAvg = ${variables.ifOutRate} graphingWorkbench.variables += ifInRateAvg graphingWorkbench.variables += ifOutRateAvg Operations with string variables -------------------------------- Beginning with v1.0.6, NetSpyGlass supports monitoring variables that have string values. One variable of this type created by default is `ifAlias`, it is used to read interface descriptions from the device. Corresponding OID (IF-MIB::ifAlias) is polled on every polling cycle, which means information about interface descriptions is always fresh in NetSpyGlass and does not depend on network discovery anymore. .. note:: In fact, the value of interface descriptions in the NetSpyGlass monitoring variables is two polling cycles behind, just like the value of all other variables. In the following example we use Python data processing script to read the latest value of interface descriptions from variable `ifAlias` and set field `description` in all other interface-related variables. This makes descriptions in various parts of the UI follow those set on the device:: class Nw2Rules(object): def __init__(self, log): self.log = log def copy_interface_description(self, out_var_name): for var1, var2 in join(import_var('ifAlias'), import_var(out_var_name)): intf_descr = var1.timeseries.getLastValueAsString() var2.ds.setDescription(intf_descr) export_var(out_var_name, [var2]) def execute(self): self.log.debug('=== Copy interface description from ifAlias') self.copy_interface_description('ifOperStatus') self.copy_interface_description('ifHCInOctets') self.copy_interface_description('ifHCOutOctets') self.copy_interface_description('ifInErrors') self.copy_interface_description('ifOutErrors') self.copy_interface_description('ifInDiscards') self.copy_interface_description('ifOutDiscards') self.copy_interface_description('ifSpeed') Function :func:`copy_interface_description` matches instances of the variable `ifAlias` the instances of another variable, name of which is passed as an argument, using call to :func:`nw2functions.join()` (:func:`nw2functions.join()` matches variable instances by device id and component index). The value taken from `ifAlias` is then copied to the field `description` in the :class:`net.happygears.nw2.py.DataSource`. This makes updated description appear everywhere in the UI and reports. The code in this example is part of the standard `nw2rules.py` module. .. note:: Use call to :func:`net.happygears.nw2.time_series_buffer.TimeSeriesBuffer.getLastValueAsString()` to get the value as a string. If variable holds numeric values, this method returns string representation of the last value, that is, it works for both numeric and string variables but always returns a string. .. _calculating_traffic_cap: Calculating total monthly traffic value (data cap) -------------------------------------------------- In this example we calculate monthly amount of traffic that crosses an interface, in bytes. This can be used to keep track on the traffic volume on the Internet connection with data cap. The code used in this example can be generalized to perform any calculation using persistent "accumulator" variable. This script relies on the function :func:`nw2functions.get_or_create()` from module `nw2functions` that takes the following arguments: - variable name - pseudo-device name this variable should be associated with - pseudo-component name it should be associated with - (optional) description - (optional) initial value First, this function tries to find device with given name. If device exists, it tries to find interface or hardware component with component name passed as third argument. If such interface or component exists as well, the function then tries to locate monitoring variable identified by the triplet constructed from variable name, device id and component index. If this variable exists, it is returned. Note that in this case returned variable is returned with its current time series, so that its current value can be used for calculations. If device or component or the variable do not exist, this function creates one. If it needs to create device or component, it generates unique ID for it automatically. This function takes an optional argument `initial_value`, which can be used to initialize time series of the created variable. The script shown in this example does not use this parameter and instead, it calls :func:`nw2functions.query_tsdb()` in an attempt to fill time series of the variable with data from the Time Series Database. Call to :func:`nw2functions.filter_by_tags()` filters input variable `ifInRate` by tag, picking only instances with tag `Link.MyIsp` in this example. This tag is just an example, in real script it can be something like `Link.1.2.3.4` where "1.2.3.4" is an IP address of the gateway on ISP side, or `BGP4Peering.AS1000` if BGP peering with the ISP could be discovered. In the end, the filtering picks up monitoring variable associated with "right" interface. Function :func:`calculate_monthly_traffic()` takes the following arguments: - input variable (a generator of :class:`MonitoringVariable` instances) - aggregate variable (single :class:`MonitoringVariable` instance, this is our accumulator variable) The idea is to call standard Python function :func:`reduce()` to add latest value of input variables multiplied by the polling interval (in sec) to the current value of the aggregate variable. Since the input `ifInRate` describes traffic level through interface in `bits/sec`, the result of this calculation is total amount of traffic through interface in bits ever since the variable has been created and initialized with "0.0". To calculate monthly data cap values, :func:`calculate_monthly_traffic()` resets the value of the aggregate variable once a month. Function :func:`beginning_of_month()` (not shown here) returns `True` if time stamps of two latest observations in the time series of the monitoring variables correspond to different months. To get traffic level in bytes, the result is divided by 8 in the call `div(import_var('ifInMonthlyTrafficBit'), 8)`. If input is a generator that yields multiple monitoring variable instances, their values will be added together. This can be useful if we have multiple connections to `MyIsp` and want to calculate total amount of traffic sent to or from this Isp through all connections. If we needed to calculate the sum for each connection separately, the input filter would need to be more selective. .. note:: Call to :func:`reduce()` works because class :class:`MonitoringVariable` has Python "magic" functions that allow us to write, in Python, `a + b` where `a` and `b` are :class:`MonitoringVariable` objects. This adds last values from their time series and stores result in `a`. Code:: class Nw2Rules(object): def __init__(self, log): self.log = log def calculate_monthly_traffic(self, input_var, aggregate_mvar): """ Takes values of the latest observations from variables in input_var and if they are not NaN, multiplies it by polling interval and adds the result to the value of the last observation in time series of the accumulator variable `aggregate_mvar` The output variable value is reset on the first minute of the first day of each month. This basically integrates values of the input and stores result in the output. At any given time, the latest value of the output variable is equal to the accumulated integrated value of the input from the beginning of the month up to that moment. @param input_var: input variable (iterable or generator) @param aggregate_mvar: aggregate variable @return: MonitoringVariable instance with the result """ assert isinstance(aggregate_mvar, MonitoringVariable) input_var_list = list(skip_nans(input_var)) if not input_var_list: return aggregate_mvar agg_val = aggregate_mvar.timeseries.getLastNonNaNValue() if math.isnan(agg_val) or self.beginning_of_month(input_var_list[0]): agg_val = 0.0 # Add new observation with the same value as the currently last one but new # time stamp - reduce() will use this as starting value aggregate_mvar.timeseries.put(current_timestamp(), agg_val) # add up values of latest observations in monitoring variable instances # input_var_list and aggregation variable itself. Note that if `input_var` # yields multiple monitoring variables, then the call to :func:`reduce()` # adds values from all of them to the aggregate. # return reduce(lambda x, y: x + y * polling_interval(), input_var_list, aggregate_mvar) def get_and_fill(self, name, devname, component): """ Call :finc:`get_or_create()` to try to find variable in the data pool. If it does not exist, create it. In either case, check if its timeseries buffer is empty. If it is empty, try to load latest 24hr of data from TSDB to initialize it. If that fails, initialize time series buffer with "0.0" """ traffic_mvar = get_or_create(name, devname, component, '', None) assert isinstance(traffic_mvar, MonitoringVariable) if not traffic_mvar.timeseries: # get latest 24hr of data from tsdb triplet = '{0}.{1}.{2}'.format(name, traffic_mvar.ds.deviceId, traffic_mvar.ds.index) observations = query_tsdb(triplet, time.time() - 24*3600, 24*3600 / polling_interval()) if observations: # I only need the last observation but should store all of them in case the last one is NaN traffic_mvar.timeseries.addAll(observations) else: # there was nothing in the database traffic_mvar.timeseries.put(current_timestamp(), 0.0) return traffic_mvar def execute(self): super(MyRules, self).execute() traffic_mvar = self.get_and_fill('ifInMonthlyTrafficBit', 'EthericNetworks', 'traffic') traffic_mvar = self.calculate_monthly_traffic(filter_by_tags(import_var('ifInRate'), ['Link.MyIsp:']), traffic_mvar) export_var('ifInMonthlyTrafficBit', [traffic_mvar]) # convert total accumulated bits to bytes export_var('ifInMonthlyTrafficByte', div(import_var('ifInMonthlyTrafficBit'), 8)) Difference between functions new_var() and get_or_create() ---------------------------------------------------------- We used both functions in the examples in this chapter, but subtle but important difference exists between them. Both return monitoring variable object associated with device and component with names passed as arguments. Both functions try to find device by name first. If device is found, then they try to find component by name. This can be an interface, hardware component, firewall counter, a tunnel, and so on. If the component is found, the function tries to find monitoring variable with given name that tracks this device and component. If any of these steps was unsuccessful, that is, device or component or the variable do not exist, then one will be created. If we need to create the device or component, we generate unique index for it. If the function creates device, it adds special role to it to mark it as "ephemeral" because it does not correspond to any "real" device. Even though it has a role like this, it still appears Graphing Workbench and can be used to build graphs or alerts. This device is not saved to the database and does not appear in maps though. The difference between :func:`nw2functions.new_var()` and :func:`nw2functions.get_or_create()` is in what do they do after they located device and component. Function :func:`nw2functions.get_or_create()` tries to find existing variable and returns it if it exists. This allows the caller to use its current value. If the variable does not exist, it is created and initialized (if argument `initial_value` has been provided in the call to :func:`nw2functions.get_or_create()`). Function :func:`nw2functions.new_var()` always returns temporary monitoring variable object with empty time series. We recommend you to always use :func:`nw2functions.get_or_create()` instead of :func:`nw2functions.new_var()`. If your calculation does not require current value of the variable, simply ignore it and append new observation to its time series.