Source code for module.module

#!/usr/bin/python

# -*- coding: utf-8 -*-

# Copyright (C) 2009-2012:
#    Gabes Jean, naparuba@gmail.com
#    Gerhard Lausser, Gerhard.Lausser@consol.de
#    Gregory Starck, g.starck@gmail.com
#    Hartmut Goebel, h.goebel@goebel-consult.de
#    Thibault Cohen, thibault.cohen@savoirfairelinux.com
#
# This file is part of Shinken.
#
# Shinken is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Shinken is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with Shinken.  If not, see <http://www.gnu.org/licenses/>.

"""
Collectd Plugin for Receiver or arbiter
"""

import os
import time
from itertools import izip

from shinken.basemodule import BaseModule
from shinken.external_command import ExternalCommand
from shinken.log import logger

properties = {
    'daemons': ['arbiter', 'receiver'],
    'type': 'collectd',
    'external': True,
    }

DEFAULT_PORT = 25826
DEFAULT_MULTICAST_IP = "239.192.74.66"
BUFFER_SIZE = 4096


[docs]def get_instance(plugin): """ This function is called by the module manager to get an instance of this module """ if hasattr(plugin, "multicast"): multicast = plugin.multicast.lower() in ("yes", "true", "1") else: multicast = False if hasattr(plugin, 'host'): host = plugin.host else: host = DEFAULT_MULTICAST_IP multicast = True if hasattr(plugin, 'port'): port = int(plugin.port) else: port = DEFAULT_PORT if hasattr(plugin, 'grouped_collectd_plugins'): grouped_collectd_plugins = [name.strip() for name in plugin.grouped_collectd_plugins.split(',')] else: grouped_collectd_plugins = [] logger.info("[Collectd] Using host=%s port=%d multicast=%d" % (host, port, multicast)) instance = Collectd_arbiter(plugin, host, port, multicast, grouped_collectd_plugins) return instance
import socket import struct import time from StringIO import StringIO # Collectd message types TYPE_HOST = 0x0000 TYPE_TIME = 0x0001 TYPE_TIME_HR = 0x0008 TYPE_PLUGIN = 0x0002 TYPE_PLUGIN_INSTANCE = 0x0003 TYPE_TYPE = 0x0004 TYPE_TYPE_INSTANCE = 0x0005 TYPE_VALUES = 0x0006 TYPE_INTERVAL = 0x0007 TYPE_INTERVAL_HR = 0x0009 TYPE_MESSAGE = 0x0100 TYPE_SEVERITY = 0x0101 # DS kinds DS_TYPE_COUNTER = 0 DS_TYPE_GAUGE = 1 DS_TYPE_DERIVE = 2 DS_TYPE_ABSOLUTE = 3 header = struct.Struct("!2H") number = struct.Struct("!Q") short = struct.Struct("!H") double = struct.Struct("<d") elements = {}
[docs]def decode_values(pktype, plen, buf): """ Decode values from collectd requests """ nvalues = short.unpack_from(buf, header.size)[0] off = header.size + short.size + nvalues valskip = double.size # check the packet head if ((valskip + 1) * nvalues + short.size + header.size) != plen: return [] if double.size != number.size: return [] result = [] for dstype in map(ord, buf[header.size + short.size:off]): if (dstype == DS_TYPE_COUNTER or dstype == DS_TYPE_DERIVE or dstype == DS_TYPE_ABSOLUTE): v = (dstype, number.unpack_from(buf, off)[0]) result.append(v) off += valskip elif dstype == DS_TYPE_GAUGE: v = (dstype, double.unpack_from(buf, off)[0]) result.append(v) off += valskip else: logger.warning("[Collectd] DS type %i unsupported" % dstype) return result # Get a u64
[docs]def decode_number(pktype, pklen, buf): """ Decode number typed value """ return number.unpack_from(buf, header.size)[0] # Get a simple char
[docs]def decode_string(msgtype, pklen, buf): """ Decode string typed value """ return buf[header.size:pklen-1] # Mapping of message types to decoding functions.
decoder_mapping = { TYPE_VALUES: decode_values, TYPE_TIME: decode_number, TYPE_TIME_HR: decode_number, TYPE_INTERVAL: decode_number, TYPE_INTERVAL_HR: decode_number, TYPE_HOST: decode_string, TYPE_PLUGIN: decode_string, TYPE_PLUGIN_INSTANCE: decode_string, TYPE_TYPE: decode_string, TYPE_TYPE_INSTANCE: decode_string, TYPE_MESSAGE: decode_string, TYPE_SEVERITY: decode_number, }
[docs]def decode_packet(buf): """ decode packet from collectd requests """ off = 0 buflen = len(buf) while off < buflen: pktype, pklen = header.unpack_from(buf, off) if pklen > buflen - off: raise ValueError("Packet too long?") if pktype not in decoder_mapping: raise ValueError("Message type %i not recognized" % pktype) v = decoder_mapping[pktype](pktype, pklen, buf[off:]) yield pktype, v off += pklen
[docs]class Data(list, object): """ This class will transform datas :grouped_collectd_plugins: list of collecd plugins to group """ def __init__(self, grouped_collectd_plugins=[], **kw): self.kind = 0 self.time = 0 self.interval = 0 self.host = None self.plugin = '' self.plugininstance = '' self.type = '' self.typeinstance = '' self.message = '' self.severity = 0 self.values = [] self.grouped_collectd_plugins = grouped_collectd_plugins def __str__(self): """ Return a readable format of a Data object """ return "[%i] %s" % (self.time, self.values)
[docs] def get_srv_desc(self): """ Determine service name from collectd datas """ r = self.plugin if not r in self.grouped_collectd_plugins: if self.plugininstance: r += '-' + self.plugininstance return r
[docs] def get_message(self): """ Get message of a Data object """ return self.message
[docs] def get_kind(self): """ Get kind of a Data object """ return self.kind
[docs] def get_metric_name(self): """ Determine perf data name from collectd datas """ r = self.type if self.plugin in self.grouped_collectd_plugins: if self.plugininstance: r += '-' + self.plugininstance if self.typeinstance: r += '-' + self.typeinstance return r
[docs] def get_metric_values(self): """ Determine perf datas from collectd datas """ if len(self.values) == 0: return None return self.values
[docs] def get_name(self): """ Determine data name from collectd datas """ if not self.host: return None srv_desc = self.get_srv_desc() r = '%s;%s' % (self.host, srv_desc) return r
[docs] def get_time(self): """ Return data time from collectd datas """ return self.time
[docs] def get_message_command(self): """ Return data severity (exit code) from collectd datas """ now = int(time.time()) if self.severity == 4: # OK returncode = 0 elif self.severity == 1: returncode = 2 elif self.severity == 2: returncode = 1 else: returncode = 3 return '[%d] PROCESS_SERVICE_CHECK_RESULT;%s;%s;%d;%s' % (now, self.host, self.get_srv_desc(), returncode, self.message, )
[docs]class CollectdServer(object): """ Collectd server This class listen and and handle collectd requests :host: Bind address :port: Bind port :multicast: Enable multisite :grouped_collectd_plugins: List of collecd plugins to group """ def __init__(self, host, port, multicast, grouped_collectd_plugins=[]): self.host = host self.port = port self.grouped_collectd_plugins = grouped_collectd_plugins logger.info("[Collectd] Opening socket") family, socktype, proto, _, sockaddr = socket.getaddrinfo(None if multicast else self.host, self.port, socket.AF_UNSPEC, socket.SOCK_DGRAM, 0, socket.AI_PASSIVE, )[0] self._sock = socket.socket(family, socktype, proto) self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self._sock.bind(sockaddr) logger.info("[Collectd] Socket open") if multicast: if hasattr(socket, "SO_REUSEPORT"): self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) val = struct.pack("4sl", socket.inet_aton(self.host), socket.INADDR_ANY) self._sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, val) self._sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_LOOP, 0) logger.info("[Collectd] Socket is opened")
[docs] def interpret_opcodes(self, iterable): """ Decode some stuff from Collectd """ d = Data(self.grouped_collectd_plugins) for kind, data in iterable: d.kind = kind if kind == TYPE_TIME: d.time = data elif kind == TYPE_TIME_HR: d.time = data >> 30 elif kind == TYPE_INTERVAL: d.interval = data elif kind == TYPE_INTERVAL_HR: d.interval = data >> 30 elif kind == TYPE_HOST: d.host = data elif kind == TYPE_PLUGIN: d.plugin = data elif kind == TYPE_PLUGIN_INSTANCE: d.plugininstance = data elif kind == TYPE_TYPE: d.type = data elif kind == TYPE_TYPE_INSTANCE: d.typeinstance = data elif kind == TYPE_SEVERITY: d.severity = data elif kind == TYPE_VALUES: d.values = data yield d elif kind == TYPE_MESSAGE: d.message = data yield d
[docs] def receive(self): """ Read socket """ return self._sock.recv(BUFFER_SIZE)
[docs] def decode(self, buf=None): """ Return a decode packet """ if buf is None: buf = self.receive() return decode_packet(buf)
[docs] def read(self, iterable=None): """ Return a list of decoded packets """ if iterable is None: iterable = self.decode() if isinstance(iterable, basestring): iterable = self.decode(iterable) return self.interpret_opcodes(iterable)
[docs]class Element(object): """ Element store service name and all perfdatas before send it in a external command """ def __init__(self, host_name, sdesc, interval): self.host_name = host_name self.sdesc = sdesc self.perf_datas = {} self.last_update = 0.0 self.interval = interval self.got_new_data = False
[docs] def add_perf_data(self, mname, mvalues, mtime): """ Add perf datas to the message to send to Shinken """ if not mvalues: return r = [] if mname not in self.perf_datas: for (dstype, newrawval) in mvalues: r.append((dstype, newrawval, newrawval, mtime)) else: oldvalues = self.perf_datas[mname] for (olddstype, oldrawval, oldval, oldtime), (dstype, newrawval) in izip(oldvalues, mvalues): difftime = mtime - oldtime if difftime < 1: continue if dstype == DS_TYPE_COUNTER or dstype == DS_TYPE_DERIVE or dstype == DS_TYPE_ABSOLUTE: r.append((dstype, newrawval, (newrawval - oldrawval) / float(difftime), mtime)) elif dstype == DS_TYPE_GAUGE: r.append((dstype, newrawval, newrawval, mtime)) self.perf_datas[mname] = r self.got_new_data = True
[docs] def get_command(self): """ Prepare the external command for Shinken """ if len(self.perf_datas) == 0: return None if not self.got_new_data: return None now = int(time.time()) if now > self.last_update + self.interval: r = '[%d] PROCESS_SERVICE_OUTPUT;%s;%s;CollectD| ' % (now, self.host_name, self.sdesc) for (k, v) in self.perf_datas.iteritems(): for i, w in enumerate(v): if len(v) > 1: r += '%s_%d=%s ' % (k, i, str(w[2])) else: r += '%s=%s ' % (k, str(w[2])) logger.debug('Updating: %s - %s ' % (self.host_name, self.sdesc)) # self.perf_datas.clear() self.last_update = now self.got_new_data = False return r
[docs]class Collectd_arbiter(BaseModule): """ Main class for this collecitd module """ def __init__(self, modconf, host, port, multicast, grouped_collectd_plugins=[]): BaseModule.__init__(self, modconf) self.host = host self.port = port self.multicast = multicast self.grouped_collectd_plugins = grouped_collectd_plugins # When you are in "external" mode, that is the main loop of your process
[docs] def main(self): """ Plugin main loop """ self.set_proctitle(self.name) self.set_exit_handler() try: cs = CollectdServer(self.host, self.port, self.multicast, self.grouped_collectd_plugins) while True: # Each second we are looking at sending old elements for e in elements.values(): c = e.get_command() if c is not None: logger.debug("[Collectd] Got %s" % c) self.from_q.put(ExternalCommand(c)) for item in cs.read(): logger.debug("[Collectd] %s: %s" % (item, item.__dict__)) n = item.get_name() if n and n not in elements: e = Element(item.host, item.get_srv_desc(), item.interval) elements[n] = e e = elements[n] if item.get_kind() == TYPE_VALUES: e.add_perf_data(item.get_metric_name(), item.get_metric_values(), item.get_time()) elif item.get_kind() == TYPE_MESSAGE: c = item.get_message_command() if c is not None: self.from_q.put(ExternalCommand(c)) except Exception, e: logger.error("[Collectd] exception: %s" % str(e)) except ValueError, exp: logger.error("[Collectd] Read error: %s" % exp)