トラフィックモニター

本章では、「スイッチングハブ」で説明したスイッチングハブに、OpenFlowスイッチの統計情報をモニターする機能を追加します。

ネットワークの定期健診

ネットワークは既に多くのサービスや業務のインフラとなっているため、正常で安定した稼働が維持されることが求められます。とは言え、いつも何かしらの問題が発生するものです。

ネットワークに異常が発生した場合、迅速に原因を特定し、復旧させなければなりません。本書をお読みの方には言うまでもないことと思いますが、異常を検出し、原因を特定するためには、日頃からネットワークの状態を把握しておく必要があります。例えば、あるネットワーク機器のポートのトラフィック量が非常に高い値を示していたとして、それが異常な状態なのか、いつもそうなのか、あるいはいつからそうなったのかということは、継続してそのポートのトラフィック量を測っていなければ判断することができません。

というわけで、ネットワークの健康状態を常に監視しつづけるということは、そのネットワークを使うサービスや業務の継続的な安定運用のためにも必須となります。もちろん、トラフィック情報の監視さえしていれば万全などということはありませんが、本章ではOpenFlowによるスイッチの統計情報の取得方法について説明します。

トラフィックモニターの実装

早速ですが、「スイッチングハブ」で説明したスイッチングハブにトラフィックモニター機能を追加したソースコードです。

from operator import attrgetter

from ryu.app import simple_switch_13
from ryu.controller import ofp_event
from ryu.controller.handler import MAIN_DISPATCHER, DEAD_DISPATCHER
from ryu.controller.handler import set_ev_cls
from ryu.lib import hub


class SimpleMonitor13(simple_switch_13.SimpleSwitch13):

    def __init__(self, *args, **kwargs):
        super(SimpleMonitor13, self).__init__(*args, **kwargs)
        self.datapaths = {}
        self.monitor_thread = hub.spawn(self._monitor)

    @set_ev_cls(ofp_event.EventOFPStateChange,
                [MAIN_DISPATCHER, DEAD_DISPATCHER])
    def _state_change_handler(self, ev):
        datapath = ev.datapath
        if ev.state == MAIN_DISPATCHER:
            if datapath.id not in self.datapaths:
                self.logger.debug('register datapath: %016x', datapath.id)
                self.datapaths[datapath.id] = datapath
        elif ev.state == DEAD_DISPATCHER:
            if datapath.id in self.datapaths:
                self.logger.debug('unregister datapath: %016x', datapath.id)
                del self.datapaths[datapath.id]

    def _monitor(self):
        while True:
            for dp in self.datapaths.values():
                self._request_stats(dp)
            hub.sleep(10)

    def _request_stats(self, datapath):
        self.logger.debug('send stats request: %016x', datapath.id)
        ofproto = datapath.ofproto
        parser = datapath.ofproto_parser

        req = parser.OFPFlowStatsRequest(datapath)
        datapath.send_msg(req)

        req = parser.OFPPortStatsRequest(datapath, 0, ofproto.OFPP_ANY)
        datapath.send_msg(req)

    @set_ev_cls(ofp_event.EventOFPFlowStatsReply, MAIN_DISPATCHER)
    def _flow_stats_reply_handler(self, ev):
        body = ev.msg.body

        self.logger.info('datapath         '
                         'in-port  eth-dst           '
                         'out-port packets  bytes')
        self.logger.info('---------------- '
                         '-------- ----------------- '
                         '-------- -------- --------')
        for stat in sorted([flow for flow in body if flow.priority == 1],
                           key=lambda flow: (flow.match['in_port'],
                                             flow.match['eth_dst'])):
            self.logger.info('%016x %8x %17s %8x %8d %8d',
                             ev.msg.datapath.id,
                             stat.match['in_port'], stat.match['eth_dst'],
                             stat.instructions[0].actions[0].port,
                             stat.packet_count, stat.byte_count)

    @set_ev_cls(ofp_event.EventOFPPortStatsReply, MAIN_DISPATCHER)
    def _port_stats_reply_handler(self, ev):
        body = ev.msg.body

        self.logger.info('datapath         port     '
                         'rx-pkts  rx-bytes rx-error '
                         'tx-pkts  tx-bytes tx-error')
        self.logger.info('---------------- -------- '
                         '-------- -------- -------- '
                         '-------- -------- --------')
        for stat in sorted(body, key=attrgetter('port_no')):
            self.logger.info('%016x %8x %8d %8d %8d %8d %8d %8d',
                             ev.msg.datapath.id, stat.port_no,
                             stat.rx_packets, stat.rx_bytes, stat.rx_errors,
                             stat.tx_packets, stat.tx_bytes, stat.tx_errors)

SimpleSwitch13を継承したSimpleMonitor13クラスに、トラフィックモニター機能を実装していますので、ここにはパケット転送に関する処理は出てきません。

定周期処理

スイッチングハブの処理と並行して、定期的に統計情報取得のリクエストをOpenFlowスイッチへ発行するために、スレッドを生成します。

class SimpleMonitor13(simple_switch_13.SimpleSwitch13):

    def __init__(self, *args, **kwargs):
        super(SimpleMonitor13, self).__init__(*args, **kwargs)
        self.datapaths = {}
        self.monitor_thread = hub.spawn(self._monitor)

# ...

ryu.lib.hubには、いくつかのeventletのラッパーや基本的なクラスの実装があります。ここではスレッドを生成するhub.spawn()を使用します。実際に生成されるスレッドはeventletのグリーンスレッドです。

# ...

@set_ev_cls(ofp_event.EventOFPStateChange,
            [MAIN_DISPATCHER, DEAD_DISPATCHER])
def _state_change_handler(self, ev):
    datapath = ev.datapath
    if ev.state == MAIN_DISPATCHER:
        if datapath.id not in self.datapaths:
            self.logger.debug('register datapath: %016x', datapath.id)
            self.datapaths[datapath.id] = datapath
    elif ev.state == DEAD_DISPATCHER:
        if datapath.id in self.datapaths:
            self.logger.debug('unregister datapath: %016x', datapath.id)
            del self.datapaths[datapath.id]

def _monitor(self):
    while True:
        for dp in self.datapaths.values():
            self._request_stats(dp)
        hub.sleep(10)

# ...

スレッド関数_monitor()では、登録されたスイッチに対する統計情報取得リクエストの発行を10秒間隔で無限に繰り返します。

接続中のスイッチを監視対象とするため、スイッチの接続および切断の検出にEventOFPStateChangeイベントを利用しています。このイベントはRyuフレームワークが発行するもので、Datapathのステートが変わったときに発行されます。

ここでは、DatapathのステートがMAIN_DISPATCHERになった時にそのスイッチを監視対象に登録、DEAD_DISPATCHERになった時に登録の削除を行っています。

def _request_stats(self, datapath):
    self.logger.debug('send stats request: %016x', datapath.id)
    ofproto = datapath.ofproto
    parser = datapath.ofproto_parser

    req = parser.OFPFlowStatsRequest(datapath)
    datapath.send_msg(req)

    req = parser.OFPPortStatsRequest(datapath, 0, ofproto.OFPP_ANY)
    datapath.send_msg(req)

定期的に呼び出される_request_stats()では、スイッチにOFPFlowStatsRequestOFPPortStatsRequestを発行しています。

OFPFlowStatsRequestは、フローエントリに関する統計情報をスイッチに要求します。テーブルID、出力ポート、cookie値、マッチの条件などで要求対象のフローエントリを絞ることができますが、ここではすべてのフローエントリを対象としています。

OFPPortStatsRequestは、ポートに関する統計情報をスイッチに要求します。取得したいポートの番号を指定することが出来ます。ここではOFPP_ANYを指定し、すべてのポートの統計情報を要求しています。

FlowStats

スイッチからの応答を受け取るため、FlowStatsReplyメッセージを受信するイベントハンドラを作成します。

@set_ev_cls(ofp_event.EventOFPFlowStatsReply, MAIN_DISPATCHER)
def _flow_stats_reply_handler(self, ev):
    body = ev.msg.body

    self.logger.info('datapath         '
                     'in-port  eth-dst           '
                     'out-port packets  bytes')
    self.logger.info('---------------- '
                     '-------- ----------------- '
                     '-------- -------- --------')
    for stat in sorted([flow for flow in body if flow.priority == 1],
                       key=lambda flow: (flow.match['in_port'],
                                         flow.match['eth_dst'])):
        self.logger.info('%016x %8x %17s %8x %8d %8d',
                         ev.msg.datapath.id,
                         stat.match['in_port'], stat.match['eth_dst'],
                         stat.instructions[0].actions[0].port,
                         stat.packet_count, stat.byte_count)

OPFFlowStatsReplyクラスの属性bodyは、OFPFlowStatsのリストで、FlowStatsRequestの対象となった各フローエントリの統計情報が格納されています。

プライオリティが0のTable-missフローを除いて、全てのフローエントリを選択しています。受信ポートと宛先MACアドレスでソートして、それぞれのフローエントリにマッチしたパケット数とバイト数を出力しています。

なお、ここでは一部の数値をログに出しているだけですが、継続的に情報を収集、分析するには、外部プログラムとの連携が必要になるでしょう。そのような場合、OFPFlowStatsReplyの内容をJSONフォーマットに変換することができます。

例えば次のように書くことができます。

import json

# ...

self.logger.info('%s', json.dumps(ev.msg.to_jsondict(), ensure_ascii=True,
                                  indent=3, sort_keys=True))

この場合、以下のように出力されます。

{
   "OFPFlowStatsReply": {
      "body": [
         {
            "OFPFlowStats": {
               "byte_count": 0,
               "cookie": 0,
               "duration_nsec": 680000000,
               "duration_sec": 4,
               "flags": 0,
               "hard_timeout": 0,
               "idle_timeout": 0,
               "instructions": [
                  {
                     "OFPInstructionActions": {
                        "actions": [
                           {
                              "OFPActionOutput": {
                                 "len": 16,
                                 "max_len": 65535,
                                 "port": 4294967293,
                                 "type": 0
                              }
                           }
                        ],
                        "len": 24,
                        "type": 4
                     }
                  }
               ],
               "length": 80,
               "match": {
                  "OFPMatch": {
                     "length": 4,
                     "oxm_fields": [],
                     "type": 1
                  }
               },
               "packet_count": 0,
               "priority": 0,
               "table_id": 0
            }
         },
         {
            "OFPFlowStats": {
               "byte_count": 42,
               "cookie": 0,
               "duration_nsec": 72000000,
               "duration_sec": 57,
               "flags": 0,
               "hard_timeout": 0,
               "idle_timeout": 0,
               "instructions": [
                  {
                     "OFPInstructionActions": {
                        "actions": [
                           {
                              "OFPActionOutput": {
                                 "len": 16,
                                 "max_len": 65509,
                                 "port": 1,
                                 "type": 0
                              }
                           }
                        ],
                        "len": 24,
                        "type": 4
                     }
                  }
               ],
               "length": 96,
               "match": {
                  "OFPMatch": {
                     "length": 22,
                     "oxm_fields": [
                        {
                           "OXMTlv": {
                              "field": "in_port",
                              "mask": null,
                              "value": 2
                           }
                        },
                        {
                           "OXMTlv": {
                              "field": "eth_dst",
                              "mask": null,
                              "value": "00:00:00:00:00:01"
                           }
                        }
                     ],
                     "type": 1
                  }
               },
               "packet_count": 1,
               "priority": 1,
               "table_id": 0
            }
         }
      ],
      "flags": 0,
      "type": 1
   }
}

PortStats

スイッチからの応答を受け取るため、PortStatsReplyメッセージを受信するイベントハンドラを作成します。

@set_ev_cls(ofp_event.EventOFPPortStatsReply, MAIN_DISPATCHER)
def _port_stats_reply_handler(self, ev):
    body = ev.msg.body

    self.logger.info('datapath         port     '
                     'rx-pkts  rx-bytes rx-error '
                     'tx-pkts  tx-bytes tx-error')
    self.logger.info('---------------- -------- '
                     '-------- -------- -------- '
                     '-------- -------- --------')
    for stat in sorted(body, key=attrgetter('port_no')):
        self.logger.info('%016x %8x %8d %8d %8d %8d %8d %8d',
                         ev.msg.datapath.id, stat.port_no,
                         stat.rx_packets, stat.rx_bytes, stat.rx_errors,
                         stat.tx_packets, stat.tx_bytes, stat.tx_errors)

OPFPortStatsReplyクラスの属性bodyは、OFPPortStatsのリストになっています。

OFPPortStatsには、ポート番号、送受信それぞれのパケット数、バイト数、ドロップ数、エラー数、フレームエラー数、オーバーラン数、CRCエラー数、コリジョン数などの統計情報が格納されます。

ここでは、ポート番号でソートし、受信パケット数、受信バイト数、受信エラー数、送信パケット数、送信バイト数、送信エラー数を出力しています。

トラフィックモニターの実行

それでは、実際にこのトラフィックモニターを実行してみます。

まず、「スイッチングハブ」と同様にMininetを実行します。ここで、スイッチのOpenFlowバージョンにOpenFlow13を設定することを忘れないでください。

次にいよいよトラフィックモニターの実行です。

controller: c0:

# ryu-manager --verbose ryu.app.simple_monitor_13
loading app ryu.app.simple_monitor_13
loading app ryu.controller.ofp_handler
instantiating app ryu.app.simple_monitor_13 of SimpleMonitor13
instantiating app ryu.controller.ofp_handler of OFPHandler
BRICK SimpleMonitor13
  CONSUMES EventOFPPacketIn
  CONSUMES EventOFPPortStatsReply
  CONSUMES EventOFPStateChange
  CONSUMES EventOFPFlowStatsReply
  CONSUMES EventOFPSwitchFeatures
BRICK ofp_event
  PROVIDES EventOFPPacketIn TO {'SimpleMonitor13': set(['main'])}
  PROVIDES EventOFPPortStatsReply TO {'SimpleMonitor13': set(['main'])}
  PROVIDES EventOFPStateChange TO {'SimpleMonitor13': set(['main', 'dead'])}
  PROVIDES EventOFPFlowStatsReply TO {'SimpleMonitor13': set(['main'])}
  PROVIDES EventOFPSwitchFeatures TO {'SimpleMonitor13': set(['config'])}
  CONSUMES EventOFPPortStatus
  CONSUMES EventOFPSwitchFeatures
  CONSUMES EventOFPEchoReply
  CONSUMES EventOFPPortDescStatsReply
  CONSUMES EventOFPErrorMsg
  CONSUMES EventOFPEchoRequest
  CONSUMES EventOFPHello
connected socket:<eventlet.greenio.base.GreenSocket object at 0x7fbab7189750> address:('127.0.0.1', 37934)
hello ev <ryu.controller.ofp_event.EventOFPHello object at 0x7fbab7179a90>
move onto config mode
EVENT ofp_event->SimpleMonitor13 EventOFPSwitchFeatures
switch features ev version=0x4,msg_type=0x6,msg_len=0x20,xid=0x21014c5c,OFPSwitchFeatures(auxiliary_id=0,capabilities=79,datapath_id=1,n_buffers=256,n_tables=254)
move onto main mode
EVENT ofp_event->SimpleMonitor13 EventOFPStateChange
register datapath: 0000000000000001
send stats request: 0000000000000001
EVENT ofp_event->SimpleMonitor13 EventOFPFlowStatsReply
EVENT ofp_event->SimpleMonitor13 EventOFPPortStatsReply
datapath         in-port  eth-dst           out-port packets  bytes
---------------- -------- ----------------- -------- -------- --------
datapath         port     rx-pkts  rx-bytes rx-error tx-pkts  tx-bytes tx-error
---------------- -------- -------- -------- -------- -------- -------- --------
0000000000000001        1        0        0        0        0        0        0
0000000000000001        2        0        0        0        0        0        0
0000000000000001        3        0        0        0        0        0        0
0000000000000001 fffffffe        0        0        0        0        0        0

スイッチングハブ」では、ryu-managerコマンドにSimpleSwitch13のモジュール名(ryu.app.example_switch_13)を指定しましたが、ここでは、SimpleMonitor13のモジュール名(ryu.app.simple_monitor_13)を指定しています。

この時点では、フローエントリが無く(Table-missフローエントリは表示していません)、各ポートのカウントもすべて0です。

ホスト1からホスト2へpingを実行してみましょう。

host: h1:

# ping -c1 10.0.0.2
PING 10.0.0.2 (10.0.0.2) 56(84) bytes of data.
64 bytes from 10.0.0.2: icmp_req=1 ttl=64 time=94.4 ms

--- 10.0.0.2 ping statistics ---
1 packets transmitted, 1 received, 0% packet loss, time 0ms
rtt min/avg/max/mdev = 94.489/94.489/94.489/0.000 ms
#

パケットの転送や、フローエントリが登録され、統計情報が変化します。

controller: c0:

datapath         in-port  eth-dst           out-port packets  bytes
---------------- -------- ----------------- -------- -------- --------
0000000000000001        1 00:00:00:00:00:02        2        1       42
0000000000000001        2 00:00:00:00:00:01        1        2      140
datapath         port     rx-pkts  rx-bytes rx-error tx-pkts  tx-bytes tx-error
---------------- -------- -------- -------- -------- -------- -------- --------
0000000000000001        1        3      182        0        3      182        0
0000000000000001        2        3      182        0        3      182        0
0000000000000001        3        0        0        0        1       42        0
0000000000000001 fffffffe        0        0        0        1       42        0

フローエントリの統計情報では、受信ポート1のフローにマッチしたトラフィックは、1パケット、42バイトと記録されています。受信ポート2では、2パケット、140バイトとなっています。

ポートの統計情報では、ポート1の受信パケット数(rx-pkts)は3、受信バイト数(rx-bytes)は182バイト、ポート2も3パケット、182バイトとなっています。

フローエントリの統計情報とポートの統計情報で数字が合っていませんが、これはフローエントリの統計情報は、そのエントリにマッチし転送されたパケットの情報だからです。つまり、Table-missによりPacket-Inを発行し、Packet-Outで転送されたパケットは、この統計の対象になっていないためです。

このケースでは、ホスト1が最初にブロードキャストしたARPリクエスト、ホスト2がホスト1に返したARPリプライ、ホスト1がホスト2へ発行したecho requestの3パケットが、Packet-Outによって転送されています。そのため、ポートの統計量は、フローエントリの統計量よりも多くなっています。

まとめ

本章では、統計情報の取得機能を題材として、以下の項目について説明しました。

  • Ryuアプリケーションでのスレッドの生成方法
  • Datapathの状態遷移の捕捉
  • FlowStatsおよびPortStatsの取得方法