流量監控( Traffic Monitor )

本章針對 「 交換器( Switching Hub ) 」 提到的 OpenFlow 交換器加入流量監控的功能。

定期檢查網路狀態

網路已經成為許多服務或業務的基礎建設,所以維護一個穩定的網路環境是必要的。但是網路問題總是不斷地發生。

網路發生異常的時候,必須快速的找到原因,並且儘速恢復原狀。 這不需要多說,正在閱讀本書的人都知道,找出網路的錯誤、發現真正的原因需要清楚地知道網路的狀態。例如:假設網路中特定的連接埠正處於高流量的狀態,不論是因為他是一個不正常的狀態或是任何原因導致,變成一個由於沒有持續監控所發生的問題。

因此,為了網路的安全以及業務的正常運作,持續注意網路的健康狀況是最基本的工作。當然,網路流量的監視並不能夠保證不會發生任何問題。本章將說明如何使用 OpenFlow 來取得相關的統計資訊。

安裝 Traffic Monitor

接著說明如何在 「 交換器( Switching Hub ) 」 中提到的交換器中加入流量監控的功能。

from operator import attrgetter

from ryu.app import simple_switch_13
from ryu.controller import ofp_event
from ryu.controller.handler import MAIN_DISPATCHER, DEAD_DISPATCHER
from ryu.controller.handler import set_ev_cls
from ryu.lib import hub


class SimpleMonitor(simple_switch_13.SimpleSwitch13):

    def __init__(self, *args, **kwargs):
        super(SimpleMonitor, self).__init__(*args, **kwargs)
        self.datapaths = {}
        self.monitor_thread = hub.spawn(self._monitor)

    @set_ev_cls(ofp_event.EventOFPStateChange,
                [MAIN_DISPATCHER, DEAD_DISPATCHER])
    def _state_change_handler(self, ev):
        datapath = ev.datapath
        if ev.state == MAIN_DISPATCHER:
            if not datapath.id in self.datapaths:
                self.logger.debug('register datapath: %016x', datapath.id)
                self.datapaths[datapath.id] = datapath
        elif ev.state == DEAD_DISPATCHER:
            if datapath.id in self.datapaths:
                self.logger.debug('unregister datapath: %016x', datapath.id)
                del self.datapaths[datapath.id]

    def _monitor(self):
        while True:
            for dp in self.datapaths.values():
                self._request_stats(dp)
            hub.sleep(10)

    def _request_stats(self, datapath):
        self.logger.debug('send stats request: %016x', datapath.id)
        ofproto = datapath.ofproto
        parser = datapath.ofproto_parser

        req = parser.OFPFlowStatsRequest(datapath)
        datapath.send_msg(req)

        req = parser.OFPPortStatsRequest(datapath, 0, ofproto.OFPP_ANY)
        datapath.send_msg(req)

    @set_ev_cls(ofp_event.EventOFPFlowStatsReply, MAIN_DISPATCHER)
    def _flow_stats_reply_handler(self, ev):
        body = ev.msg.body

        self.logger.info('datapath         '
                         'in-port  eth-dst           '
                         'out-port packets  bytes')
        self.logger.info('---------------- '
                         '-------- ----------------- '
                         '-------- -------- --------')
        for stat in sorted([flow for flow in body if flow.priority == 1],
                           key=lambda flow: (flow.match['in_port'],
                                             flow.match['eth_dst'])):
            self.logger.info('%016x %8x %17s %8x %8d %8d',
                             ev.msg.datapath.id,
                             stat.match['in_port'], stat.match['eth_dst'],
                             stat.instructions[0].actions[0].port,
                             stat.packet_count, stat.byte_count)

    @set_ev_cls(ofp_event.EventOFPPortStatsReply, MAIN_DISPATCHER)
    def _port_stats_reply_handler(self, ev):
        body = ev.msg.body

        self.logger.info('datapath         port     '
                         'rx-pkts  rx-bytes rx-error '
                         'tx-pkts  tx-bytes tx-error')
        self.logger.info('---------------- -------- '
                         '-------- -------- -------- '
                         '-------- -------- --------')
        for stat in sorted(body, key=attrgetter('port_no')):
            self.logger.info('%016x %8x %8d %8d %8d %8d %8d %8d', 
                             ev.msg.datapath.id, stat.port_no,
                             stat.rx_packets, stat.rx_bytes, stat.rx_errors,
                             stat.tx_packets, stat.tx_bytes, stat.tx_errors)

事實上,流量監控功能已經被實作在 SimpleMonitor 類別中並繼承自 SimpleSwitch13 ,所以這邊已經沒有轉送相關的處理功能了。

固定週期處理

透過 Switching Hub 的平行處理,建立一個執行緒並定期的向交換器發出要求以取得統計的資料。

from operator import attrgetter

from ryu.app import simple_switch_13
from ryu.controller import ofp_event
from ryu.controller.handler import MAIN_DISPATCHER, DEAD_DISPATCHER
from ryu.controller.handler import set_ev_cls
from ryu.lib import hub

class SimpleMonitor(simple_switch_13.SimpleSwitch13):

    def __init__(self, *args, **kwargs):
        super(SimpleMonitor, self).__init__(*args, **kwargs)
        self.datapaths = {}
        self.monitor_thread = hub.spawn(self._monitor)

# ...

ryu.lib.hub 中實作了一些 eventlet wrapper 和基本的類別。這裡我們使用 hub.spawn() 建立執行緒。但實際上是使用 eventlet 的 green 執行緒。

# ...

@set_ev_cls(ofp_event.EventOFPStateChange,
            [MAIN_DISPATCHER, DEAD_DISPATCHER])
def _state_change_handler(self, ev):
    datapath = ev.datapath
    if ev.state == MAIN_DISPATCHER:
        if not datapath.id in self.datapaths:
            self.logger.debug('register datapath: %016x', datapath.id)
            self.datapaths[datapath.id] = datapath
    elif ev.state == DEAD_DISPATCHER:
        if datapath.id in self.datapaths:
            self.logger.debug('unregister datapath: %016x', datapath.id)
            del self.datapaths[datapath.id]

def _monitor(self):
    while True:
        for dp in self.datapaths.values():
            self._request_stats(dp)
        hub.sleep(10)

# ...

在執行緒中 _monitor() 方法確保了執行緒可以在每 10 秒的間隔中,不斷地向註冊的交換器發送要求以取得統計資訊。

為了確認連線中的交換器都可以被持續監控, EventOFPStateChange 就可以用來監測交換器的連線中斷。這個事件偵測是 Ryu 框架所提供的功能,會被觸發在 Datapath 的狀態改變時。

當 Datapath 的狀態變成 MAIN_DISPATCHER 時,代表交換器已經註冊並正處於被監視的狀態。而狀態變成 DEAD_DISPATCHER 時代表已經從註冊狀態解除。

# ...

def _request_stats(self, datapath):
    self.logger.debug('send stats request: %016x', datapath.id)
    ofproto = datapath.ofproto
    parser = datapath.ofproto_parser

    req = parser.OFPFlowStatsRequest(datapath)
    datapath.send_msg(req)

    req = parser.OFPPortStatsRequest(datapath, 0, ofproto.OFPP_ANY)
    datapath.send_msg(req)

# ...

定期呼叫 _request_stats() 以驅動 OFPFlowStatsRequestOFPPortStatsRequest 對交換器發出訊息。

OFPFlowStatsRequest 主要用來對交換器的 Flow Entry 取得統計的資料。 對於交換器發出的要求可以使用 table ID、output port、cookie 值和 match 條件來限縮範圍,但是這邊的例子是取得所有的 Flow Entry。

OFPPortStatsRequest 是用來取得關於交換器的連接埠相關資訊以及統計訊息。 使用的時候可以指定連接埠號,這邊使用 OFPP_ANY 目的是要取得所有的連接埠統計資料。

FlowStats

為了接收來自交換器的回應,建立一個 event handler 來接受從交換器發送的 FlowStatsReply 訊息。

# ...

@set_ev_cls(ofp_event.EventOFPFlowStatsReply, MAIN_DISPATCHER)
def _flow_stats_reply_handler(self, ev):
    body = ev.msg.body

    self.logger.info('datapath         '
                     'in-port  eth-dst           '
                     'out-port packets  bytes')
    self.logger.info('---------------- '
                     '-------- ----------------- '
                     '-------- -------- --------')
    for stat in sorted([flow for flow in body if flow.priority == 1],
                       key=lambda flow: (flow.match['in_port'],
                                         flow.match['eth_dst'])):
        self.logger.info('%016x %8x %17s %8x %8d %8d',
                         ev.msg.datapath.id,
                         stat.match['in_port'], stat.match['eth_dst'],
                         stat.instructions[0].actions[0].port,
                         stat.packet_count, stat.byte_count)
# ...

OPFFlowStatsReply 類別的屬性 bodyOFPFlowStats 的列表,當中儲存了每一個 Flow Entry 的統計資訊,並做為 FlowStatsRequest 的回應。

權限為零的 Table-miss Flow 除外的全部 Flow Entry 將會被選擇,通過並符合該 Flow Entry 的封包數和位元數統計資料將會被回傳,並以接收埠號和目的 MAC 位址的方式排序。

為了持續的收集以及分析,僅有一部份的資料會被輸出到 log。因此若要進行分析,連結到外部的程式是必須的。在這樣的情況下 OFPFlowStatsReply 的內容可以被轉換成為 JSON 的格式進行輸出。

可以設定如下格式

import json

# ...

self.logger.info('%s', json.dumps(ev.msg.to_jsondict(), ensure_ascii=True,
                                  indent=3, sort_keys=True))

上述的設定將會產生結果如下

{
   "OFPFlowStatsReply": {
      "body": [
         {
            "OFPFlowStats": {
               "byte_count": 0,
               "cookie": 0,
               "duration_nsec": 680000000,
               "duration_sec": 4,
               "flags": 0,
               "hard_timeout": 0,
               "idle_timeout": 0,
               "instructions": [
                  {
                     "OFPInstructionActions": {
                        "actions": [
                           {
                              "OFPActionOutput": {
                                 "len": 16,
                                 "max_len": 65535,
                                 "port": 4294967293,
                                 "type": 0
                              }
                           }
                        ],
                        "len": 24,
                        "type": 4
                     }
                  }
               ],
               "length": 80,
               "match": {
                  "OFPMatch": {
                     "length": 4,
                     "oxm_fields": [],
                     "type": 1
                  }
               },
               "packet_count": 0,
               "priority": 0,
               "table_id": 0
            }
         },
         {
            "OFPFlowStats": {
               "byte_count": 42,
               "cookie": 0,
               "duration_nsec": 72000000,
               "duration_sec": 57,
               "flags": 0,
               "hard_timeout": 0,
               "idle_timeout": 0,
               "instructions": [
                  {
                     "OFPInstructionActions": {
                        "actions": [
                           {
                              "OFPActionOutput": {
                                 "len": 16,
                                 "max_len": 65509,
                                 "port": 1,
                                 "type": 0
                              }
                           }
                        ],
                        "len": 24,
                        "type": 4
                     }
                  }
               ],
               "length": 96,
               "match": {
                  "OFPMatch": {
                     "length": 22,
                     "oxm_fields": [
                        {
                           "OXMTlv": {
                              "field": "in_port",
                              "mask": null,
                              "value": 2
                           }
                        },
                        {
                           "OXMTlv": {
                              "field": "eth_dst",
                              "mask": null,
                              "value": "00:00:00:00:00:01"
                           }
                        }
                     ],
                     "type": 1
                  }
               },
               "packet_count": 1,
               "priority": 1,
               "table_id": 0
            }
         }
      ],
      "flags": 0,
      "type": 1
   }
}

PortStats

為了接收交換器所回覆的訊息,PortStatsReply 訊息接收的事件接收處理必須要被實作。

# ...

@set_ev_cls(ofp_event.EventOFPPortStatsReply, MAIN_DISPATCHER)
def _port_stats_reply_handler(self, ev):
    body = ev.msg.body

    self.logger.info('datapath         port     '
                     'rx-pkts  rx-bytes rx-error '
                     'tx-pkts  tx-bytes tx-error')
    self.logger.info('---------------- -------- '
                     '-------- -------- -------- '
                     '-------- -------- --------')
    for stat in sorted(body, key=attrgetter('port_no')):
        self.logger.info('%016x %8x %8d %8d %8d %8d %8d %8d',
                         ev.msg.datapath.id, stat.port_no,
                         stat.rx_packets, stat.rx_bytes, stat.rx_errors,
                         stat.tx_packets, stat.tx_bytes, stat.tx_errors)

OPFPortStatsReply 類別的屬性 body 會列出在 OFPPortStats 中的資料列表。

OFPPortStats 連接埠號儲存接收端的封包數量、位元數量、丟棄封包數量、錯誤數量、frame錯誤數量、overrrun數量、CRC錯誤數量、collection數量等等的統計資訊。

依據連接埠號的排序列出接收的封包數量、接收位元數量、接收錯誤數量、發送封包數量、發送位元數、發送錯誤數量。

執行 Traffic Monitor

接下來實際的執行流量監控。

首先,跟「 交換器( Switching Hub ) 」一樣的執行 Mininet。這邊別忘了交換器的 OpenFlow 版本設定為 OpenFlow13。

下一步,執行流量監控程式。

controller: c0:

ryu@ryu-vm:~# ryu-manager --verbose ./simple_monitor.py
loading app ./simple_monitor.py
loading app ryu.controller.ofp_handler
instantiating app ./simple_monitor.py
instantiating app ryu.controller.ofp_handler
BRICK SimpleMonitor
  CONSUMES EventOFPStateChange
  CONSUMES EventOFPFlowStatsReply
  CONSUMES EventOFPPortStatsReply
  CONSUMES EventOFPPacketIn
  CONSUMES EventOFPSwitchFeatures
BRICK ofp_event
  PROVIDES EventOFPStateChange TO {'SimpleMonitor': set(['main', 'dead'])}
  PROVIDES EventOFPFlowStatsReply TO {'SimpleMonitor': set(['main'])}
  PROVIDES EventOFPPortStatsReply TO {'SimpleMonitor': set(['main'])}
  PROVIDES EventOFPPacketIn TO {'SimpleMonitor': set(['main'])}
  PROVIDES EventOFPSwitchFeatures TO {'SimpleMonitor': set(['config'])}
  CONSUMES EventOFPErrorMsg
  CONSUMES EventOFPPortDescStatsReply
  CONSUMES EventOFPHello
  CONSUMES EventOFPEchoRequest
  CONSUMES EventOFPSwitchFeatures
connected socket:<eventlet.greenio.GreenSocket object at 0x343fb10> address:('127.0.0.1', 55598)
hello ev <ryu.controller.ofp_event.EventOFPHello object at 0x343fed0>
move onto config mode
EVENT ofp_event->SimpleMonitor EventOFPSwitchFeatures
switch features ev version: 0x4 msg_type 0x6 xid 0x7dd2dc58 OFPSwitchFeatures(auxiliary_id=0,capabilities=71,datapath_id=1,n_buffers=256,n_tables=254)
move onto main mode
EVENT ofp_event->SimpleMonitor EventOFPStateChange
register datapath: 0000000000000001
send stats request: 0000000000000001
EVENT ofp_event->SimpleMonitor EventOFPFlowStatsReply
datapath         in-port  eth-dst           out-port packets  bytes
---------------- -------- ----------------- -------- -------- --------
EVENT ofp_event->SimpleMonitor EventOFPPortStatsReply
datapath         port     rx-pkts  rx-bytes rx-error tx-pkts  tx-bytes tx-error
---------------- -------- -------- -------- -------- -------- -------- --------
0000000000000001        1        0        0        0        0        0        0
0000000000000001        2        0        0        0        0        0        0
0000000000000001        3        0        0        0        0        0        0
0000000000000001 fffffffe        0        0        0        0        0        0

在「 交換器( Switching Hub ) 」中,我們使用 ryu-manager 指令來設定 SimpleSwitch13 模組名稱( ryu.app.simple_switch_13 )。 至於這邊則自定 SimpleMonitor 的檔案名稱( ./simple_monitor.py )。

在這個時候 Flow Entry 是空白的( Table-miss Flow Entry 沒有被顯示出來 ),每個連接埠的計數器也都為零。

從 host 1 向 host 2 執行 ping 的指令。

host: h1:

root@ryu-vm:~# ping -c1 10.0.0.2
PING 10.0.0.2 (10.0.0.2) 56(84) bytes of data.
64 bytes from 10.0.0.2: icmp_req=1 ttl=64 time=94.4 ms

--- 10.0.0.2 ping statistics ---
1 packets transmitted, 1 received, 0% packet loss, time 0ms
rtt min/avg/max/mdev = 94.489/94.489/94.489/0.000 ms
root@ryu-vm:~#

封包的轉送、Flow Entry 的註冊狀況,統計資料開始有了變化。

controller: c0:

datapath         in-port  eth-dst           out-port packets  bytes
---------------- -------- ----------------- -------- -------- --------
0000000000000001        1 00:00:00:00:00:02        2        1       42
0000000000000001        2 00:00:00:00:00:01        1        2      140
datapath         port     rx-pkts  rx-bytes rx-error tx-pkts  tx-bytes tx-error
---------------- -------- -------- -------- -------- -------- -------- --------
0000000000000001        1        3      182        0        3      182        0
0000000000000001        2        3      182        0        3      182        0
0000000000000001        3        0        0        0        1       42        0
0000000000000001 fffffffe        0        0        0        1       42        0

Flow Entry 的統計資訊中,在接收埠 1 的 Flow match 流量訊息中,1 個封包,42 個位元組的資訊被記錄下來。接收埠 2 則是 2 個封包,140 個位元組。

連接埠的統計資訊,連接埠 1 的封包接收( rx-pkts )數量為 3,接收位元組( rx-bytes )數量為 102 bytes,連接埠 2 也是 3個封包,182 位元組。

Flow Entry 的統計資訊和連接埠的統計資訊是不可以混為一談的,這是因為 Flow Entry 的統計資訊是記錄 match 的 Entry 所轉送封包的統計資料。也就是說被 Table-miss 觸發的 Packet-In 以及 Packet-Out 轉送封包都不能算在統計資料內。

在這個案例中,host 1 最初的廣播訊息是 ARP request,而 host 2 回覆了 host 1 的 ARP 訊息,host 1 對 host 2 發送了 echo request 總共 3 個封包,這些都是透過觸發 Packet-Out 所傳送的。 因此連接埠的流量會遠大於 Flow Entry 的流量。

本章總結

本章藉由取得統計資料做為題目,嘗試說明下列事項。

  • 產生 Ryu 應用程式執行緒的方法
  • 取得 Datapath 狀態的改變
  • 取得 FlowStats 和 PortStats 資訊的方法