计算机网络管理复习
第一章 网络管理概论
1.1 网络管理基本概念
1.1.1 网络管理定义
网络管理是指对网络的运行状态进行监测和控制,并能提供有效、可靠、安全、经济的服务。网络管理完成两个核心任务:
- 网络监视:对网络的运行状态进行监视,了解网络当前状态是否正常
- 网络控制:对网络的运行进行控制,对网络资源进行合理分配,优化网络性能
1.1.2 网络管理目标
网络管理的目标是使网络的性能达到最优化,最大限度地满足网络管理者和网络用户对计算机网络的要求:
- 有效性:网络能够高效地传输数据
- 可靠性:网络运行稳定,故障率低
- 开放性:支持多种标准和协议
- 综合性:能够管理各种网络设备和服务
- 安全性:保护网络免受攻击和非法访问
- 经济性:以合理的成本提供网络服务
1.1.3 网络管理对象
被管对象是对网络资源的抽象表示,主要分为两类:
硬件资源:
- 物理介质:网卡、双绞线、同轴电缆、光纤等
- 计算机设备:处理机、打印机、存储设备等
- 网络互连设备:中继器、网桥、交换机、路由器、网关等
软件资源:
- 操作系统
- 应用软件
- 通信软件:实现通信协议的软件,如FDDI、ATM等
- 网络设备软件:路由器软件、网桥软件、交换机软件等
1.2 网络管理系统架构
1.2.1 网络管理系统组成要素(🙂)
现代网络管理系统由四个核心要素组成:
-
网络管理者(Network Manager)
- 也称网络管理站或管理进程
- 位于网络系统的主干或接近主干位置的工作站、微机等
-
网络代理(Managed Agent)
- 位于被管理设备内部的代理程序
- 负责转换管理者命令和信息
-
网络管理协议(Network Management Protocol,NMP)
- 用于网络管理者和管理代理之间传递信息
- 完成信息交换安全控制的通信规约
-
管理信息库(Management Information Base,MIB)
- 对通过网络管理协议可以访问信息的精确定义
- 是一个信息存储库
1.2.2 网络管理系统层次结构
网络管理系统主要分为管理站和代理两部分,其层次结构从下到上包括:
- 操作系统和计算机硬件
- 通信协议栈:OSI、TCP/IP等通信协议,以及SNMP、CMIP等网络管理协议
- 网络管理框架(Network Management Framework)
- 网络管理应用(Network Management Application)
网络管理框架提供的功能:
- 为存储管理信息提供数据库支持
- 提供用户接口和用户视图功能
- 提供基本的管理操作
1.2.3 网络管理系统配置
集中式管理:
- 网络中至少有一个结点担当管理站角色
- 管理站包含网络管理应用(NMA)
- 优点:管理人员可以有效控制整个网络资源
- 缺点:对于大型网络显得力不从心
分布式管理:
- 多个管理站协同工作
- 适合大型网络环境
- 符合分布式计算模型发展趋势
委托代理(Proxy):
- 用于管理不支持标准网络管理协议的设备
- 委托代理与非标准设备之间运行专用协议
- 委托代理与管理站之间运行标准协议
- 起到协议转换的作用
1.3 网络管理功能域
ISO将网络系统管理任务划分为5个功能域(FCAPS模型):
1.3.1 故障管理(Fault Management)
目标:尽快发现故障,找出故障原因,及时采取补救措施
功能模块:
-
故障检测和报警功能
- 记录系统出错情况和可能引起故障的事件
- 存储在运行日志数据库中
- 主动发送出错事件报告
-
故障预测功能
- 对可能引起故障的参数建立门限值
- 监视参数值变化
- 超过门限值时发送警报
-
故障诊断和定位功能
- 对设备和通信线路进行测试
- 找出故障原因和故障地点
- 包括连接测试、数据完整性测试、协议完整性测试等
1.3.2 配置管理(Configuration Management)
目标:设备初始化、维护和关闭网络设备或子系统
主要功能:
- 定义配置信息
- 设置和修改设备属性
- 定义和修改网络元素间的互联关系
- 启动和终止网络运行
- 发行软件
- 检查参数值和互联关系
- 报告配置现状
1.3.3 性能管理(Performance Management)
基本功能:
-
实时数据采集
- 跟踪系统、网络或业务情况
- 监测流量、负载、丢包、温度、内存、延迟等性能指标
- 可设置数据采集间隔
-
数据分析和统计
- 对实时数据进行分析
- 判断是否处于正常水平
- 自动形成管理报表
- 以图形方式显示网络性能状况
-
运行日志维护
- 维护并检查系统运行日志
-
性能预警
- 为重要指标设定阈值
- 超过阈值时发出警报
-
性能分析报告
- 生成性能趋势曲线
- 生成性能统计分析报表
1.3.4 计费管理(Accounting Management)
功能:
- 计算网络建设及运营成本
- 统计网络资源利用率,确定计费标准
- 通知用户应缴费用
- 支持用户费用上限设置
- 保存收费账单及原始数据
计费类型:
- 基于网络流量计费:根据用户网络流量收费
- 基于使用时间计费:根据使用时间长短收费
- 基于网络服务计费:根据使用的网络服务收费
1.3.5 安全管理(Security Management)
目标:确保网络资源不被非法使用,防止攻击破坏
主要内容:
- 安全信息分发(密钥分发、访问权设置等)
- 安全通知(非法侵入、无权访问等)
- 安全服务措施的创建、控制和删除
- 安全相关事件的记录、维护和查询
涉及方面:
- 安全信息维护
- 资源访问控制
- 网络安全技术
1.4 网络监控技术
1.4.1 管理信息分类
对网络监控有用的管理信息分为3类:
-
静态信息
- 系统和网络的配置信息
- 例如:路由器端口数、工作站标识、CPU类型等
- 不经常变化
-
动态信息
- 与网络事件和设备工作状态相关
- 例如:传送的分组数、网络连接状态等
- 实时变化
-
统计信息
- 从动态信息推导出的信息
- 例如:平均每分钟发送的分组数、传输失败概率等
1.4.2 通信机制
代理和监视器之间有两种通信技术:
-
轮询(Polling)
- 请求-响应式交互
- 监视器向代理发出请求
- 代理响应并返回信息
-
事件报告(Event Reporting)
- 代理主动发送消息给管理站
- 可定时发送状态报告
- 可在检测到特定事件时发送报告
1.5 性能监视
1.5.1 性能指标分类
面向服务的性能指标:
-
可用性(Availability)
- 网络系统对用户可利用时间的百分比
- 计算公式:A = MTBF/(MTBF + MTTR)
- MTBF:平均无故障时间
- MTTR:平均维修时间
-
响应时间(Response Time)
- 从用户输入请求到系统返回结果的时间间隔
- 对用户生产率影响很大
- 响应时间小于1秒时事务处理速率明显加快
-
正确性(Correctness)
- 网络传输的正确性
- 监视传输误码率
- 发现线路故障和通信干扰
面向效率的性能指标:
-
吞吐率(Throughput)
- 一段时间内完成的数据处理量
- 接受用户会话的数量
- 处理呼叫的数量
-
利用率(Utilization)
- 网络资源利用的百分率
- 与网络负载相关
- 负载增加时利用率增大,响应时间变长
1.5.2 网络利用率分析方法
基本思想:观察链路实际通信量与规划容量的比较
分析步骤:
- 计算各链路负载占总负载的百分率(相对负载)
- 计算各链路容量占总容量的百分率(相对容量)
- 计算相对负载与相对容量的比值(相对利用率)
- 根据相对利用率调整链路容量配置
1.6 网络控制
1.6.1 配置控制
配置管理功能模块详解:
-
定义配置信息
- 描述网络资源的特征和属性
- 包括物理资源和逻辑资源
-
设置和修改属性
- 限制条件:
- 只有授权管理站才可以修改
- 硬件配置信息不可改变
- 修改类型:
- 只修改数据库
- 修改数据库并改变设备状态
- 修改数据库并引起设备动作
- 限制条件:
-
定义和修改互联关系
- 网络资源之间的联系和连接
- 包括拓扑结构、物理连接、逻辑连接等
- 支持联机修改
-
启动和终止网络运行
- 验证资源属性设置
- 发送确认应答
- 允许检索统计信息
-
发行软件
- 向端系统和中间系统发行软件
- 装载指定软件
- 更新软件版本
- 配置软件参数
1.6.2 安全控制
安全需求:
- 保密性(Secrecy):信息只能由授权用户读取
- 数据完整性(Integrity):信息只能被授权用户修改
- 可用性(Availability):授权用户可以正常使用网络资源
安全威胁类型:
- 中断(Interruption):破坏可用性
- 窃取(Interception):破坏保密性
- 篡改(Modification):破坏完整性
- 假冒(Fabrication):破坏完整性
对网络管理的安全威胁:
- 管理系统失灵
- 发出错误管理指令
- 破坏网络正常运行
1.7 网络管理标准
1.7.1 主要标准体系
-
SNMP体系结构
- 基于TCP/IP参考模型
- 优点:简单性和实用性
- 广泛应用于Internet管理
-
CMIP体系结构
- 基于OSI参考模型
- 优点:通用性和完备性
- 应用于电信网管理标准(TMN)
1.7.2 网络管理平台
市场上的商用网络管理系统:
- 主机厂家产品:IBM NetView、HP OpenView
- 网络产品制造商产品:Cisco Works2000、Cabletron Spectrum
1.8 小结
网络管理概论涵盖了网络管理的基本概念、系统架构、功能域、监控技术和标准体系。重点掌握:
- 网络管理的定义和目标
- 网络管理系统的四个核心要素
- FCAPS五大功能域的具体内容
- 性能监视的关键指标和分析方法
- 网络控制的配置管理和安全管理
- SNMP和CMIP两大标准体系的特点
第二章 抽象语法表示ASN.1
2.1 网络数据表示概述
2.1.1 ASN.1基本概念
**抽象语法表示ASN.1(Abstract Syntax Notation One)**是一种形式语言,用于定义应用数据的抽象语法和应用协议数据单元的结构。
**基本编码规则BER(Basic Encoding Rule)**是与ASN.1配套的编码规则,用于将ASN.1定义的应用数据转换成比特串在网络中传输。
2.1.2 表示层功能
在OSI/RM框架中,表示层提供统一的网络数据表示。表示实体定义应用数据的抽象语法,类似于程序设计语言的抽象数据类型。
数据转换过程:
- 应用协议按预定义抽象语法构造协议数据单元
- 表示实体对应用层数据进行编码,转换成二进制比特串
- 比特串由传输实体在网络中传送
- 在各端系统内部,应用数据映像成本地特殊形式
2.2 ASN.1语法规则
2.2.1 文本约定(Lexical Conventions)
- 布局规则:多个空格和空行等效于一个空格
- 标识符规则:
- 标识符以小写字母开头
- 类型指针和模块名以大写字母开头
- ASN.1内部类型全部用大写字母表示
- 关键字全部用大写字母表示
- 注释规则:以一对短线(--)开始,以一对短线或行尾结束
2.2.2 主要符号
::= 定义为(产生式)
{} 集合或序列的界定符
[] 可选项或标签的界定符
() 子类型或值的界定符
| 选择符
.. 范围符
-- 注释符
2.3 数据类型系统
2.3.1 标签系统
每个数据类型都有一个标签(Tag),标签有类型和值,数据类型由标签的类型和值唯一决定。
标签类型:
- 通用标签(UNIVERSAL):标准定义的类型,适用于任何应用
- 应用标签(APPLICATION):具体应用定义的类型
- 上下文专用标签(CONTEXT SPECIFIC):用符号[n]表示,在特定范围内适用
- 私有标签(PRIVATE):用户定义的标签
2.3.2 数据类型分类
1. 简单类型 由单一成分构成的原子类型,包括:
基本类型组:
- BOOLEAN:布尔类型,值为TRUE或FALSE
- INTEGER:整数类型,可定义命名值
- BIT STRING:比特串类型
- OCTET STRING:字节串类型
- REAL:实数类型,表示为M × B^E(M和E为整数,B为2或10)
- ENUMERATED:枚举类型,值为命名的整数
-- 枚举类型示例
EthernetAdapterStatus ::= ENUMERATED {
normal(0),
degraded(1),
offline(2),
failed(3)
}
-- 整数类型示例
EthernetNumberCollisionsRange ::= INTEGER {
minimum(0),
maximum(1000)
}
字符串类型组:
- NumericString、PrintableString、T61String等
- 都是OCTET STRING的子集
对象标识符类型组:
- OBJECT IDENTIFIER:对象标识符,由整数序列组成
- Object Descriptor:对象描述符,人工可读形式
-- 对象标识符示例
internet OBJECT IDENTIFIER ::= {
iso(1) org(3) dod(6) 1
}
-- 值为:1.3.6.1 或 iso.org.dod.1
特殊类型组:
- NULL:空类型,无值
- EXTERNAL:外部类型
- UTCTime:世界通用时(YYMMDD格式)
- GeneralizedTime:通用时间(YYYYMMDD格式)
2. 构造类型 由多个成分构成的类型:
序列类型(SEQUENCE):
- 元素有序
- 可包含不同类型元素
- 支持OPTIONAL和DEFAULT
-- 序列类型示例
Person ::= SEQUENCE {
name IA5String,
age INTEGER OPTIONAL,
address IA5String DEFAULT "Unknown"
}
-- 序列值示例
person Person ::= {name "Tony", age 25}
序列OF类型(SEQUENCE OF):
-- 相同类型元素的序列
Seats ::= SEQUENCE OF INTEGER
seats Seats ::= {20, 30, 40}
集合类型(SET):
- 元素无序
- 其他特性与SEQUENCE相同
-- 集合类型示例
Person ::= SET {
age INTEGER,
name IA5String
}
person Person ::= {25, "Tony"} -- 顺序可变
集合OF类型(SET OF):
VipSeats ::= SET OF INTEGER
vipseats VipSeats ::= {60, 80, 120}
3. 标签类型 给现有类型加上新标签:
隐含标签(IMPLICIT):
- 用新标签替换老标签
- 编码时只编码新标签
- 产生较短编码
明示标签(EXPLICIT):
- 在基类型上加新标签
- 编码时新老标签都编码
-- 标签类型示例
Parentage ::= SET {
subjectName [1] IMPLICIT IA5String,
motherName [2] IMPLICIT IA5String OPTIONAL,
fatherName [3] IMPLICIT IA5String OPTIONAL
}
4. 其他类型
CHOICE类型:
- 可选类型的联合
- 运行时确定具体类型
-- CHOICE类型示例
Prize ::= CHOICE {
car IA5String,
cash INTEGER,
nothing BOOLEAN
}
ANY类型:
- 表示任意类型的任意值
- 实际类型也未知
-- ANY类型示例
SoftwareVersion ::= ANY
-- 或
SoftwareVersion ::= ANY DEFINED BY INTEGER
2.4 子类型
2.4.1 子类型概念
子类型是通过限制父类型的值集合而导出的类型,子类型的值集合是父类型的子集。
2.4.2 产生子类型的方法
1. 单个值 列出子类型可取的各个值:
SmallPrime ::= INTEGER (2|3|5|7|11|13|17|19|23|29)
2. 包含子类型 使用INCLUDES关键字:
First-half ::= Months (INCLUDES First-quarter | INCLUDES Second-quarter)
3. 值区间 适用于整数和实数类型:
PositiveInteger ::= INTEGER (0<..PLUS-INFINITY)
NegativeInteger ::= INTEGER (MINUS-INFINITY..<0)
4. 可用字符 限制字符串可使用的字符集:
TouchToneButtons ::= IA5String (FROM("0"|"1"|"2"|"3"|"4"|"5"|"6"|"7"|"8"|"9"|"*"|"#"))
DigitString ::= IA5String (FROM("0"|"1"|"2"|"3"|"4"|"5"|"6"|"7"|"8"|"9"))
5. 限制大小 限制比特串、字节串长度或序列、集合元素个数:
X25DataNumber ::= DigitString (SIZE(5..14))
ParameterList ::= SET SIZE(0..12) OF Parameter
6. 内部子类型 用于序列、集合和CHOICE的子类型:
TestPDU ::= PDU (WITH COMPONENTS {
...,
delta(FALSE),
alpha(MIN..<0)
})
2.5 基本编码规则(BER)
2.5.1 TLV结构
BER将ASN.1抽象类型值编码为字节串,采用**类型标签-长度-值(TLV)**结构:
+----------+----------+----------+
| Type Tag | Length | Value |
+----------+----------+----------+
2.5.2 类型标签字节结构
+---+---+---+---+---+---+---+---+
| 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
+---+---+---+---+---+---+---+---+
| 类型 |构造| 标签值 |
| (2位) |标志| (5位) |
+-------+----+------------------+
- 位7-6:标签类型(00=UNIVERSAL, 01=APPLICATION, 10=CONTEXT, 11=PRIVATE)
- 位5:构造标志(0=简单类型, 1=构造类型)
- 位4-0:标签值(如果≥31,则这5位全为1,实际值在后续字节中)
2.5.3 编码示例
示例1:布尔类型编码
FALSE: 01 01 00
TRUE: 01 01 FF
- 01:UNIVERSAL 1(布尔类型)
- 01:长度为1字节
- 00/FF:值
示例2:整数编码
十进制256: 02 02 01 00
- 02:UNIVERSAL 2(整数类型)
- 02:长度为2字节
- 01 00:值(256的二进制表示)
示例3:比特串编码
比特串10101: 03 02 03 A8
- 03:UNIVERSAL 3(比特串类型)
- 02:长度为2字节
- 03:未使用的比特数
- A8:值(10101000)
示例4:序列编码
SEQUENCE{madeofwood TRUE, length 62}:
30 06 01 01 FF 02 01 3E
- 30:UNIVERSAL 16, 构造类型(序列)
- 06:总长度6字节
- 01 01 FF:布尔值TRUE
- 02 01 3E:整数值62
2.5.4 长度编码规则
短格式(长度≤127):
- 最高位为0,其余7位表示长度值
长格式(长度>127):
- 第一字节最高位为1,其余7位表示后续字节数
- 后续字节表示实际长度值
不定长格式:
- 用于构造类型
- 长度字段为80,内容以00 00结束
2.6 ASN.1宏定义
2.6.1 宏的概念
宏是ASN.1的扩展机制,允许定义新的语法结构和语义规则。
2.6.2 宏定义语法
MACRO-NAME MACRO ::= BEGIN
TYPE NOTATION ::= type-notation
VALUE NOTATION ::= value-notation
supporting-productions
END
2.6.3 常用宏示例
OBJECT-TYPE宏(用于SNMP MIB定义):
OBJECT-TYPE MACRO ::= BEGIN
TYPE NOTATION ::=
"SYNTAX" type(Syntax)
"ACCESS" Access
"STATUS" Status
"DESCRIPTION" Text
"INDEX" "{" IndexTypes "}"
"DEFVAL" "{" Defval "}"
VALUE NOTATION ::= value(VALUE ObjectName)
Access ::= "read-only" | "read-write" | "write-only" | "not-accessible"
Status ::= "mandatory" | "optional" | "obsolete" | "deprecated"
END
2.7 实际应用示例
2.7.1 个人记录示例
-- 个人记录定义
PersonnelRecord ::= [APPLICATION 0] IMPLICIT SET {
name Name,
title [0] VisibleString,
number EmployeeNumber,
dateOfHire [1] Date,
nameOfSpouse [2] Name,
children [3] IMPLICIT SEQUENCE OF ChildInformation DEFAULT {}
}
Name ::= [APPLICATION 1] IMPLICIT SEQUENCE {
givenName VisibleString,
initial VisibleString,
familyName VisibleString
}
EmployeeNumber ::= [APPLICATION 2] IMPLICIT INTEGER
Date ::= [APPLICATION 3] IMPLICIT VisibleString -- YYYYMMDD
ChildInformation ::= SET {
name Name,
dateOfBirth [0] Date
}
2.7.2 网络管理对象示例
-- 以太网接口对象
EthernetInterface ::= SEQUENCE {
ifIndex INTEGER,
ifDescr DisplayString,
ifType INTEGER,
ifMtu INTEGER,
ifSpeed Gauge,
ifPhysAddress PhysAddress,
ifAdminStatus INTEGER {
up(1),
down(2),
testing(3)
},
ifOperStatus INTEGER {
up(1),
down(2),
testing(3)
}
}
2.8 编程实践
2.8.1 Python ASN.1编程示例
安装ASN.1库:
pip install pyasn1
基本类型编码示例:
from pyasn1.type import univ, namedtype, tag
from pyasn1.codec.der import encoder, decoder
# 定义简单类型
boolean_val = univ.Boolean(True)
integer_val = univ.Integer(256)
octet_string_val = univ.OctetString(b'Hello')
# 编码
encoded_bool = encoder.encode(boolean_val)
encoded_int = encoder.encode(integer_val)
encoded_str = encoder.encode(octet_string_val)
print(f"Boolean编码: {encoded_bool.hex()}")
print(f"Integer编码: {encoded_int.hex()}")
print(f"OctetString编码: {encoded_str.hex()}")
# 解码
decoded_bool, _ = decoder.decode(encoded_bool, asn1Spec=univ.Boolean())
decoded_int, _ = decoder.decode(encoded_int, asn1Spec=univ.Integer())
decoded_str, _ = decoder.decode(encoded_str, asn1Spec=univ.OctetString())
print(f"Boolean解码: {decoded_bool}")
print(f"Integer解码: {decoded_int}")
print(f"OctetString解码: {decoded_str}")
序列类型编码示例:
from pyasn1.type import univ, namedtype
# 定义Person序列类型
class Person(univ.Sequence):
componentType = namedtype.NamedTypes(
namedtype.NamedType('name', univ.UTF8String()),
namedtype.NamedType('age', univ.Integer())
)
# 创建Person实例
person = Person()
person.setComponentByName('name', 'Tony')
person.setComponentByName('age', 25)
# 编码
encoded_person = encoder.encode(person)
print(f"Person编码: {encoded_person.hex()}")
# 解码
decoded_person, _ = decoder.decode(encoded_person, asn1Spec=Person())
print(f"姓名: {decoded_person.getComponentByName('name')}")
print(f"年龄: {decoded_person.getComponentByName('age')}")
CHOICE类型编码示例:
from pyasn1.type import univ, namedtype, char
# 定义Prize CHOICE类型
class Prize(univ.Choice):
componentType = namedtype.NamedTypes(
namedtype.NamedType('car', char.UTF8String()),
namedtype.NamedType('cash', univ.Integer()),
namedtype.NamedType('nothing', univ.Boolean())
)
# 创建不同的Prize实例
car_prize = Prize()
car_prize.setComponentByName('car', 'BMW')
cash_prize = Prize()
cash_prize.setComponentByName('cash', 10000)
# 编码
encoded_car = encoder.encode(car_prize)
encoded_cash = encoder.encode(cash_prize)
print(f"Car prize编码: {encoded_car.hex()}")
print(f"Cash prize编码: {encoded_cash.hex()}")
2.8.2 ASN.1编译器使用
使用asn1c编译器:
# 安装asn1c
sudo apt-get install asn1c
# 编译ASN.1文件
asn1c -fcompound-names PersonnelRecord.asn1
# 生成的C代码文件
ls *.c *.h
C语言编程示例:
[[include]] "PersonnelRecord.h"
[[include]] <stdio.h>
int main() {
PersonnelRecord_t *record;
Name_t *name;
// 创建记录
record = calloc(1, sizeof(PersonnelRecord_t));
name = calloc(1, sizeof(Name_t));
// 设置姓名
OCTET_STRING_fromString(&name->givenName, "John");
OCTET_STRING_fromString(&name->initial, "D");
OCTET_STRING_fromString(&name->familyName, "Smith");
record->name = *name;
// 设置员工编号
record->number = 12345;
// 编码
asn_enc_rval_t ec;
ec = der_encode_to_buffer(&asn_DEF_PersonnelRecord, record, buffer, sizeof(buffer));
if(ec.encoded == -1) {
printf("编码失败\n");
} else {
printf("编码成功,长度: %ld\n", ec.encoded);
}
return 0;
}
2.9 小结
ASN.1抽象语法表示是网络管理中的重要基础技术,主要知识点包括:
- 基本概念:ASN.1提供统一的网络数据表示,BER提供编码规则
- 类型系统:简单类型、构造类型、标签类型、其他类型
- 子类型机制:通过限制父类型值集合产生子类型
- 编码规则:TLV结构,类型标签-长度-值的编码方式
- 宏定义:扩展ASN.1语法的机制
- 实际应用:在SNMP MIB定义中广泛使用
掌握ASN.1对于理解网络管理协议和MIB定义至关重要。
第三章 管理信息库MIB-2
3.1 SNMP基本概念
3.1.1 TCP/IP协议簇
TCP/IP协议簇与OSI/RM的对应关系:
- 应用层:FTP、TELNET、SMTP、SNMP等
- 传输层:TCP、UDP
- 网络层:IP、ICMP、IGMP、ARP、RARP
- 网络访问层:各种局域网和广域网协议
TCP/IP特点:
- 允许同层协议实体之间互相作用
- 允许上层过程直接调用不相邻的下层过程
- 控制信息和数据可分别传输
3.1.2 TCP/IP网络管理框架
SNMPv1框架由四个RFC文档定义:
-
RFC 1155:管理信息结构(SMI)
- 规定管理对象的语法和语义
- 说明如何定义和访问管理对象
-
RFC 1212:MIB模块定义方法
- 定义MIB模块的标准方法
-
RFC 1213:MIB-2核心对象集合
- 定义任何SNMP系统必须实现的管理对象
-
RFC 1157:SNMPv1协议规范
- 定义SNMP协议的具体实现
3.1.3 SNMP协议体系结构
协议栈结构:
+------------------+
| 管理应用程序 |
+------------------+
| SNMP |
+------------------+
| UDP |
+------------------+
| IP |
+------------------+
| 网络访问层 |
+------------------+
选择UDP的原因:
- 效率较高,不会过多增加网络负载
- 实现简单
- 缺点:不可靠,报文容易丢失
实现建议:
- 每个管理信息装配成单独数据报发送
- 报文应短些,不超过484字节
团体关系(Community):
- 团体名作为全局标识符
- 简单的身份认证手段
- 代理不接受未通过团体名验证的报文
委托代理(Proxy Agent):
- 用于管理不支持TCP/IP的设备
- 一个委托代理可管理多台非TCP/IP设备
- 代表这些设备接收管理站查询
3.2 MIB树结构
3.2.1 层次树结构的作用
1. 表示管理和控制关系
- 上层中间结点是组织机构名
- 说明机构负责下面子树信息的管理和审批
- 例如:org(3)由ISO代管,internet(1)由IAB代管
2. 提供结构化信息组织技术
- 下层中间结点代表与网络资源或协议相关的信息集合
- 例如:IP协议管理信息都在ip(4)子树中
- 便于沿树层次访问相关信息
3. 提供对象命名机制
- 每个结点都有分层编号
- 叶子结点代表实际管理对象
- 从树根到树叶的编号串联形成全局标识
- 例如:internet标识符为1.3.6.1或{iso(1) org(3) dod(6) 1}
3.2.2 MIB-2分组结构
MIB-2包含以下主要功能组:
- system(1):系统信息
- interfaces(2):接口信息
- at(3):地址转换(已废弃)
- ip(4):IP协议信息
- icmp(5):ICMP协议信息
- tcp(6):TCP协议信息
- udp(7):UDP协议信息
- egp(8):EGP协议信息
- transmission(10):传输介质信息
- snmp(11):SNMP协议信息
3.3 MIB数据类型
3.3.1 ASN.1通用类型
MIB中使用ASN.1的5种通用类型:
- INTEGER:整数类型
- OCTET STRING:字节串类型
- OBJECT IDENTIFIER:对象标识符类型
- NULL:空类型
- SEQUENCE:序列类型(构造类型)
3.3.2 应用类型
RFC 1155定义了6种应用类型:
1. NetworkAddress
NetworkAddress ::= CHOICE {
internet IpAddress
}
- 网络地址选择类型
- 目前只有IP地址一种
2. IpAddress
IpAddress ::= [APPLICATION 0] IMPLICIT OCTET STRING (SIZE(4))
- 32位IP地址
- 定义为4字节的OCTET STRING
3. Counter
Counter ::= [APPLICATION 1] IMPLICIT INTEGER (0..4294967295)
- 计数器类型,非负整数
- 值只能增加,不能减少
- 达到最大值2³²-1后回零
- 用于计算接收分组数、字节数等
4. Gauge
Gauge ::= [APPLICATION 2] IMPLICIT INTEGER (0..4294967295)
- 计量器类型,非负整数
- 值可增加也可减少
- 达到最大值后锁定在2³²-1
- 用于表示缓冲队列中的分组数
5. TimeTicks
TimeTicks ::= [APPLICATION 3] IMPLICIT INTEGER (0..4294967295)
- 时钟类型,非负整数
- 单位是百分之一秒(centiseconds)
- 表示从某事件开始到目前经过的时间
6. Opaque
Opaque ::= [APPLICATION 4] IMPLICIT OCTET STRING
- 不透明类型,未知数据类型
- 可表示任意类型
- 编码时按OCTET STRING处理
3.4 管理信息结构定义(<font color="#f79646">不考</font>)
3.4.1 OBJECT-TYPE宏定义
RFC 1212定义的对象类型宏:
OBJECT-TYPE MACRO ::= BEGIN
TYPE NOTATION ::=
"SYNTAX" type(TYPE ObjectSyntax)
"ACCESS" Access
"STATUS" Status
DescrPart
ReferPart
IndexPart
DefValPart
VALUE NOTATION ::= value(VALUE ObjectName)
Access ::= "read-only" | "read-write" | "write-only" | "not-accessible"
Status ::= "mandatory" | "optional" | "obsolete" | "deprecated"
DescrPart ::= "DESCRIPTION" value(description DisplayString) | empty
ReferPart ::= "REFERENCE" value(reference DisplayString) | empty
IndexPart ::= "INDEX" "{" IndexTypes "}"
DefValPart ::= "DEFVAL" "{" value(defvalue ObjectSyntax) "}" | empty
END
3.4.2 关键字段说明
SYNTAX:
- 对象类型的抽象语法
- 可以是ASN.1通用类型或应用类型之一
ACCESS:
- read-only:只读
- read-write:读写
- write-only:只写
- not-accessible:不可访问(用于表索引)
STATUS:
- mandatory:必要的,必须实现
- optional:任选的,可选实现
- obsolete:过时的,新标准不支持
- deprecated:可取消的,将来可能被取消
DESCRIPTION:
- 用文字说明对象类型的含义
INDEX:
- 定义表对象的索引项
- 用于唯一标识表中的行
3.4.3 对象定义示例
tcpMaxConn OBJECT-TYPE
SYNTAX INTEGER
ACCESS read-only
STATUS mandatory
DESCRIPTION "The limit on the total number of TCP connections
the entity can support."
::= { tcp 4 }
3.5 标量对象和表对象
3.5.1 标量对象
标量对象是单一值的对象,直接对应一个管理信息项。
示例:
sysDescr OBJECT-TYPE
SYNTAX DisplayString (SIZE (0..255))
ACCESS read-only
STATUS mandatory
DESCRIPTION "A textual description of the entity."
::= { system 1 }
3.5.2 表对象
表对象是二维数组结构,用于存储相关信息的集合。
表定义特点:
- 整个表是同类型序列
- 每行是表的一个条目(Entry)
- 每行由多个不同类型的标量元素组成
- 需要定义索引来唯一标识每行
TCP连接表示例:
-- TCP连接表
tcpConnTable OBJECT-TYPE
SYNTAX SEQUENCE OF TcpConnEntry
ACCESS not-accessible
STATUS mandatory
DESCRIPTION "A table containing TCP connection-specific information."
::= { tcp 13 }
-- TCP连接表条目
tcpConnEntry OBJECT-TYPE
SYNTAX TcpConnEntry
ACCESS not-accessible
STATUS mandatory
DESCRIPTION "Information about a particular current TCP connection."
INDEX { tcpConnLocalAddress, tcpConnLocalPort,
tcpConnRemAddress, tcpConnRemPort }
::= { tcpConnTable 1 }
-- TCP连接表条目结构
TcpConnEntry ::= SEQUENCE {
tcpConnState INTEGER,
tcpConnLocalAddress IpAddress,
tcpConnLocalPort INTEGER (0..65535),
tcpConnRemAddress IpAddress,
tcpConnRemPort INTEGER (0..65535)
}
-- 连接状态
tcpConnState OBJECT-TYPE
SYNTAX INTEGER {
closed(1),
listen(2),
synSent(3),
synReceived(4),
established(5),
finWait1(6),
finWait2(7),
closeWait(8),
lastAck(9),
closing(10),
timeWait(11)
}
ACCESS read-write
STATUS mandatory
DESCRIPTION "The state of this TCP connection."
::= { tcpConnEntry 1 }
3.5.3 表对象访问
表对象的访问通过索引进行:
- 索引由多个字段组成
- 索引值的组合唯一标识表中的一行
- 例如:tcpConnState.192.168.1.1.80.192.168.1.100.1024
3.6 MIB-2功能组详解
3.6.1 System组(system)
包含系统级信息:
-- 系统描述
sysDescr OBJECT-TYPE
SYNTAX DisplayString (SIZE (0..255))
ACCESS read-only
STATUS mandatory
::= { system 1 }
-- 系统对象标识符
sysObjectID OBJECT-TYPE
SYNTAX OBJECT IDENTIFIER
ACCESS read-only
STATUS mandatory
::= { system 2 }
-- 系统启动时间
sysUpTime OBJECT-TYPE
SYNTAX TimeTicks
ACCESS read-only
STATUS mandatory
::= { system 3 }
-- 系统联系人
sysContact OBJECT-TYPE
SYNTAX DisplayString (SIZE (0..255))
ACCESS read-write
STATUS mandatory
::= { system 4 }
-- 系统名称
sysName OBJECT-TYPE
SYNTAX DisplayString (SIZE (0..255))
ACCESS read-write
STATUS mandatory
::= { system 5 }
-- 系统位置
sysLocation OBJECT-TYPE
SYNTAX DisplayString (SIZE (0..255))
ACCESS read-write
STATUS mandatory
::= { system 6 }
-- 系统服务
sysServices OBJECT-TYPE
SYNTAX INTEGER (0..127)
ACCESS read-only
STATUS mandatory
::= { system 7 }
3.6.2 Interfaces组(interfaces)
包含网络接口信息:
-- 接口数量
ifNumber OBJECT-TYPE
SYNTAX INTEGER
ACCESS read-only
STATUS mandatory
DESCRIPTION "The number of network interfaces present on this system."
::= { interfaces 1 }
-- 接口表
ifTable OBJECT-TYPE
SYNTAX SEQUENCE OF IfEntry
ACCESS not-accessible
STATUS mandatory
::= { interfaces 2 }
ifEntry OBJECT-TYPE
SYNTAX IfEntry
ACCESS not-accessible
STATUS mandatory
INDEX { ifIndex }
::= { ifTable 1 }
IfEntry ::= SEQUENCE {
ifIndex INTEGER,
ifDescr DisplayString,
ifType INTEGER,
ifMtu INTEGER,
ifSpeed Gauge,
ifPhysAddress PhysAddress,
ifAdminStatus INTEGER,
ifOperStatus INTEGER,
ifLastChange TimeTicks,
ifInOctets Counter,
ifInUcastPkts Counter,
ifInNUcastPkts Counter,
ifInDiscards Counter,
ifInErrors Counter,
ifInUnknownProtos Counter,
ifOutOctets Counter,
ifOutUcastPkts Counter,
ifOutNUcastPkts Counter,
ifOutDiscards Counter,
ifOutErrors Counter,
ifOutQLen Gauge,
ifSpecific OBJECT IDENTIFIER
}
3.6.3 IP组(ip)
包含IP协议相关信息:
-- IP转发开关
ipForwarding OBJECT-TYPE
SYNTAX INTEGER {
forwarding(1),
not-forwarding(2)
}
ACCESS read-write
STATUS mandatory
::= { ip 1 }
-- IP默认TTL
ipDefaultTTL OBJECT-TYPE
SYNTAX INTEGER (1..255)
ACCESS read-write
STATUS mandatory
::= { ip 2 }
-- IP统计信息
ipInReceives OBJECT-TYPE
SYNTAX Counter
ACCESS read-only
STATUS mandatory
::= { ip 3 }
-- IP地址表
ipAddrTable OBJECT-TYPE
SYNTAX SEQUENCE OF IpAddrEntry
ACCESS not-accessible
STATUS mandatory
::= { ip 20 }
-- IP路由表
ipRouteTable OBJECT-TYPE
SYNTAX SEQUENCE OF IpRouteEntry
ACCESS not-accessible
STATUS mandatory
::= { ip 21 }
3.7 编程实践
3.7.1 Python SNMP编程示例
安装SNMP库:
pip install pysnmp
基本SNMP操作示例:
from pysnmp.hlapi import *
def snmp_get(target, oid, community='public'):
"""SNMP GET操作"""
for (errorIndication, errorStatus, errorIndex, varBinds) in nextCmd(
SnmpEngine(),
CommunityData(community),
UdpTransportTarget((target, 161)),
ContextData(),
ObjectType(ObjectIdentity(oid)),
lexicographicMode=False):
if errorIndication:
print(f"错误: {errorIndication}")
break
elif errorStatus:
print(f"错误: {errorStatus.prettyPrint()} at {errorIndex and varBinds[int(errorIndex) - 1][0] or '?'}")
break
else:
for varBind in varBinds:
print(f"{varBind[0]} = {varBind[1]}")
def snmp_walk(target, oid, community='public'):
"""SNMP WALK操作"""
for (errorIndication, errorStatus, errorIndex, varBinds) in nextCmd(
SnmpEngine(),
CommunityData(community),
UdpTransportTarget((target, 161)),
ContextData(),
ObjectType(ObjectIdentity(oid)),
lexicographicMode=False,
ignoreNonIncreasingOid=False):
if errorIndication:
print(f"错误: {errorIndication}")
break
elif errorStatus:
print(f"错误: {errorStatus.prettyPrint()}")
break
else:
for varBind in varBinds:
print(f"{varBind[0]} = {varBind[1]}")
# 使用示例
if __name__ == "__main__":
target_host = "192.168.1.1"
# 获取系统描述
print("系统描述:")
snmp_get(target_host, "1.3.6.1.2.1.1.1.0")
# 获取系统启动时间
print("\n系统启动时间:")
snmp_get(target_host, "1.3.6.1.2.1.1.3.0")
# 遍历接口表
print("\n接口信息:")
snmp_walk(target_host, "1.3.6.1.2.1.2.2.1.2")
MIB对象定义示例:
from pysnmp.smi import builder, view, compiler
def load_mib_module():
"""加载MIB模块"""
# 创建MIB构建器
mibBuilder = builder.MibBuilder()
# 加载标准MIB模块
mibBuilder.loadModules('SNMPv2-MIB', 'IF-MIB', 'IP-MIB')
# 创建MIB视图
mibView = view.MibViewController(mibBuilder)
return mibView
def resolve_oid(mibView, oid_name):
"""解析OID名称到数字OID"""
try:
oid, label, suffix = mibView.getNodeByName(oid_name)
return '.'.join([str(x) for x in oid])
except Exception as e:
print(f"解析OID失败: {e}")
return None
def resolve_name(mibView, oid):
"""解析数字OID到名称"""
try:
oid_tuple = tuple([int(x) for x in oid.split('.')])
name, label, suffix = mibView.getNodeByOid(oid_tuple)
return str(name)
except Exception as e:
print(f"解析名称失败: {e}")
return None
# 使用示例
mibView = load_mib_module()
# OID名称到数字
print("sysDescr OID:", resolve_oid(mibView, ('SNMPv2-MIB', 'sysDescr')))
print("ifNumber OID:", resolve_oid(mibView, ('IF-MIB', 'ifNumber')))
# 数字OID到名称
print("1.3.6.1.2.1.1.1 名称:", resolve_name(mibView, "1.3.6.1.2.1.1.1"))
3.7.2 自定义MIB对象
定义自定义MIB模块:
EXAMPLE-MIB DEFINITIONS ::= BEGIN
IMPORTS
MODULE-IDENTITY, OBJECT-TYPE, Integer32, enterprises
FROM SNMPv2-SMI
DisplayString
FROM SNMPv2-TC;
exampleMIB MODULE-IDENTITY
LAST-UPDATED "202406080000Z"
ORGANIZATION "Example Organization"
CONTACT-INFO "contact@example.com"
DESCRIPTION "Example MIB module"
::= { enterprises 12345 }
-- 定义自定义对象
exampleObjects OBJECT IDENTIFIER ::= { exampleMIB 1 }
-- 服务器温度
serverTemperature OBJECT-TYPE
SYNTAX Integer32 (0..100)
UNITS "degrees Celsius"
MAX-ACCESS read-only
STATUS current
DESCRIPTION "Current server temperature in degrees Celsius"
::= { exampleObjects 1 }
-- 服务器状态
serverStatus OBJECT-TYPE
SYNTAX INTEGER {
running(1),
stopped(2),
maintenance(3),
error(4)
}
MAX-ACCESS read-write
STATUS current
DESCRIPTION "Current server status"
::= { exampleObjects 2 }
-- 服务器描述
serverDescription OBJECT-TYPE
SYNTAX DisplayString (SIZE(0..255))
MAX-ACCESS read-write
STATUS current
DESCRIPTION "Server description"
::= { exampleObjects 3 }
END
3.8 小结
MIB-2管理信息库是SNMP网络管理的核心,主要知识点包括:
-
基本概念:
- TCP/IP网络管理框架
- SNMP协议体系结构
- 团体关系和委托代理
-
MIB树结构:
- 层次树的三种作用
- 对象命名机制
- MIB-2功能组分类
-
数据类型系统:
- ASN.1通用类型
- 6种应用类型(Counter、Gauge、TimeTicks等)
- 各类型的特点和用途
-
对象定义:
- OBJECT-TYPE宏定义
- 语法、访问、状态等关键字段
- 标量对象和表对象的区别
-
功能组详解:
- System组:系统级信息
- Interfaces组:网络接口信息
- IP组:IP协议相关信息
-
编程实践:
- Python SNMP编程
- MIB模块加载和OID解析
- 自定义MIB对象定义
理解MIB-2的结构和对象定义对于网络管理系统的开发和维护至关重要。
第四章 简单网络管理协议SNMP
4.1 SNMP的演变历程
4.1.1 SNMPv1的产生
历史背景:
- 1987年11月:提出简单网关监控协议(SGMP)
- 1990-1991年:在SGMP基础上改进成SNMPv1
- 相关RFC文档:
- RFC 1155:管理信息结构(SMI)
- RFC 1157:SNMP协议规范
- RFC 1212:MIB定义方法
- RFC 1213:MIB-2规范
SNMPv1优点:
- 简单性:容易实现和理解
- 基于成熟的SGMP协议
- 有相当多的操作经验
- 得到制造商广泛支持
4.1.2 双轨制策略
1988年确定的网络管理标准开发策略:
短期解决方案(SNMP):
- 满足当前网络管理需要
- 用于管理配置简单的网络
- 可平稳过渡到新标准
长期解决方案(CMOT):
- OSI网络管理(CMIP Over TCP/IP)
- 应付未来复杂网络配置
- 提供更全面的管理功能
- 需要较长开发过程
双轨制策略停止的原因:
- SNMP向OSI管理过渡困难
- OSI系统管理标准开发进展缓慢
4.1.3 SNMPv1的安全缺陷
主要问题:
- 没有实质性安全设施
- 无数据源认证功能
- 不能防止偷听
- 许多制造商废除set命令
安全SNMP(S-SNMP): 1992年7月提出,增强安全功能:
- 用MD5算法保证数据完整性和数据源认证
- 用时间戳对报文排序
- 用DES算法提供数据加密功能
4.1.4 SNMPv2的发展
简单管理协议(SMP): 与S-SNMP同时提出,扩充方面:
- 适用范围:管理任意资源,支持应用管理、系统管理
- 复杂程度:保持简单性,提供数据块传送能力
- 安全设施:结合S-SNMP的安全功能
- 兼容性:适合多种网络协议
SNMPv2的形成:
- 以SMP为基础开发
- 放弃S-SNMP
- 组织两个工作组:功能扩展组和安全组
- 最终决定丢掉安全功能,保留其他功能
- 形成基于团体的SNMP(SNMPv2C)
- 1996年1月发布RFC 1901-1908
4.1.5 SNMPv3的诞生
发展动机:
- SNMPv2未达到"商业级别"安全要求
- 需要提供完整的安全功能
SNMPv3目标:
- 适应不同管理需求的各种操作环境
- 便于已有系统向SNMPv3过渡
- 方便建立和维护管理系统
发布历程:
- 1998年1月:发表5个建议标准文件
- 1999年4月:公布标准草案
- 增加安全和高层管理功能
- 与SNMPv1和SNMPv2兼容
4.2 SNMPv1协议数据单元
4.2.1 SNMPv1支持的操作
SNMP仅支持对管理对象值的简单操作:
基本操作:
- Get:管理站检索MIB中标量对象的值
- Set:管理站设置MIB中标量对象的值
- Trap:代理向管理站报告管理对象状态变化
操作限制:
- 不支持改变MIB结构(不能增删对象实例)
- 不能向管理对象发出执行动作的命令
- 只能逐个访问叶结点,不能一次访问子树
- 子树结点都是不可访问的
4.2.2 SNMP报文格式
SNMP报文由三部分组成:
+-------------+-------------+------------------+
| 版本号 | 团体名 | 协议数据单元(PDU) |
+-------------+-------------+------------------+
版本号:指示SNMP版本(SNMPv1为0) 团体名:用于身份认证的字符串 PDU:包含具体的管理操作信息
4.2.3 PDU类型和格式(😀)
PDU类型:
- GetRequest:获取请求
- GetNextRequest:获取下一个请求
- SetRequest:设置请求
- GetResponse:响应
- Trap:陷阱
通用PDU格式(除Trap外):
| PDU格式 | 请求标识 | 错误状态 | 错误索引 | 变量绑定表 |
|---|
字段说明:
- PDU类型:5种PDU类型之一
- 请求标识:唯一整数,用于区分不同请求
- 错误状态:6种错误状态,表示处理过程中的错误
- 错误索引:错误状态非0时指向出错变量
- 变量绑定表:变量名和对应值的表
错误状态类型:
- noError(0):无错误
- tooBig(1):响应PDU太大
- noSuchName(2):变量名不存在
- badValue(3):变量值错误
- readOnly(4):变量只读
- genError(5):一般错误
4.2.4 Trap PDU格式
Trap报文格式特殊,包含以下字段:
| 制造商ID | 代理地址 | 一般陷阱 | 特殊陷阱 | 时间戳 | 变量绑定表 |
|---|
字段说明:
- 制造商ID:设备制造商标识,与sysObjectID相同
- 代理地址:产生陷阱的代理IP地址
- 一般陷阱:SNMP定义的7种陷阱类型
- 特殊陷阱:设备相关的特殊陷阱代码
- 时间戳:代理发出陷阱的时间,与sysUpTime相同
一般陷阱类型:
- coldStart(0):冷启动
- warmStart(1):热启动
- linkDown(2):链路断开
- linkUp(3):链路连接
- authenticationFailure(4):认证失败
- egpNeighborLoss(5):EGP邻居丢失
- enterpriseSpecific(6):企业特定
4.3 SNMPv1操作详解
4.3.1 报文处理流程(😀)
发送报文流程:
- 按ASN.1格式构造PDU
- 认证进程检查通信权限
- 组装报文(版本号、团体名、PDU)
- BER编码
- 传输实体发送
接收报文流程:
- BER解码恢复ASN.1报文
- 语法分析和版本号验证
- 认证信息验证
- 分离PDU并进行语法分析
- 处理并返回应答报文
- 认证失败时生成陷阱报文
4.3.2 Get操作详解
Get操作特点:
- 检索标量对象值
- 支持一次检索多个对象
- 具有原子性:全部成功或全部失败
响应处理逻辑:
def process_get_request(request):
"""处理Get请求的伪代码"""
response = create_response(request.request_id)
for variable in request.variable_bindings:
try:
# 检查对象是否存在
if not object_exists(variable.name):
return error_response("noSuchName", variable.index)
# 检查访问权限
if not has_read_access(variable.name):
return error_response("noSuchName", variable.index)
# 获取对象值
value = get_object_value(variable.name)
response.add_variable(variable.name, value)
except TooBigError:
return error_response("tooBig", 0)
except Exception:
return error_response("genError", variable.index)
return response
Get操作示例:
# 请求示例
get_request = GetRequest(
request_id=12345,
variable_bindings=[
("1.3.6.1.2.1.7.1.0", None), # udpInDatagrams
("1.3.6.1.2.1.7.2.0", None), # udpNoPorts
("1.3.6.1.2.1.7.3.0", None), # udpInErrors
("1.3.6.1.2.1.7.4.0", None) # udpOutDatagrams
]
)
# 响应示例
get_response = GetResponse(
request_id=12345,
error_status=0,
error_index=0,
variable_bindings=[
("1.3.6.1.2.1.7.1.0", 100),
("1.3.6.1.2.1.7.2.0", 1),
("1.3.6.1.2.1.7.3.0", 2),
("1.3.6.1.2.1.7.4.0", 200)
]
)
4.3.3 GetNext操作详解
GetNext操作特点:
- 检索"下一个"对象实例
- 按词典顺序遍历MIB树
- 用于发现MIB结构
- 支持表遍历
GetNext处理逻辑:
def process_getnext_request(request):
"""处理GetNext请求的伪代码"""
response = create_response(request.request_id)
for variable in request.variable_bindings:
try:
# 查找下一个对象
next_oid = find_next_object(variable.name)
if next_oid is None:
return error_response("noSuchName", variable.index)
# 检查访问权限
if not has_read_access(next_oid):
# 继续查找下一个可访问对象
next_oid = find_next_accessible_object(next_oid)
if next_oid is None:
return error_response("noSuchName", variable.index)
value = get_object_value(next_oid)
response.add_variable(next_oid, value)
except Exception:
return error_response("genError", variable.index)
return response
GetNext操作示例:
# 使用GetNext发现MIB结构
def walk_mib_tree(agent, start_oid):
"""遍历MIB树"""
current_oid = start_oid
results = []
while True:
response = agent.get_next(current_oid)
if response.error_status != 0:
break
next_oid, value = response.variable_bindings[0]
# 检查是否还在目标子树中
if not next_oid.startswith(start_oid):
break
results.append((next_oid, value))
current_oid = next_oid
return results
# 遍历UDP组
udp_objects = walk_mib_tree(agent, "1.3.6.1.2.1.7")
4.3.4 表对象遍历
表遍历策略:
- 使用GetNext逐行遍历
- 利用索引值递增特性
- 检测表结束条件
表遍历示例:
def walk_table(agent, table_oid):
"""遍历SNMP表"""
table_data = []
current_oid = table_oid
while True:
response = agent.get_next(current_oid)
if response.error_status != 0:
break
next_oid, value = response.variable_bindings[0]
# 检查是否还在表中
if not next_oid.startswith(table_oid):
break
# 解析行索引
row_index = extract_row_index(next_oid, table_oid)
column_id = extract_column_id(next_oid, table_oid)
table_data.append({
'row_index': row_index,
'column_id': column_id,
'value': value
})
current_oid = next_oid
return organize_table_data(table_data)
# 遍历接口表
interface_table = walk_table(agent, "1.3.6.1.2.1.2.2.1")
4.3.5 Set操作详解
Set操作特点:
- 设置管理对象值
- 具有原子性
- 需要写访问权限
- 可能触发设备状态变化
Set处理逻辑:
def process_set_request(request):
"""处理Set请求的伪代码"""
response = create_response(request.request_id)
# 第一阶段:验证所有变量
for variable in request.variable_bindings:
if not object_exists(variable.name):
return error_response("noSuchName", variable.index)
if not has_write_access(variable.name):
return error_response("readOnly", variable.index)
if not validate_value(variable.name, variable.value):
return error_response("badValue", variable.index)
# 第二阶段:执行所有设置操作
try:
for variable in request.variable_bindings:
set_object_value(variable.name, variable.value)
response.add_variable(variable.name, variable.value)
except Exception:
# 回滚所有更改
rollback_changes()
return error_response("genError", 0)
return response
Set操作示例:
# 设置系统联系人信息
set_request = SetRequest(
request_id=12346,
variable_bindings=[
("1.3.6.1.2.1.1.4.0", "admin@company.com"), # sysContact
("1.3.6.1.2.1.1.6.0", "Server Room A") # sysLocation
]
)
# 设置接口管理状态
set_interface_status = SetRequest(
request_id=12347,
variable_bindings=[
("1.3.6.1.2.1.2.2.1.7.1", 1) # ifAdminStatus.1 = up(1)
]
)
4.3.6 Trap操作详解
Trap特点:
- 代理主动发送
- 无需应答
- 报告异常事件
- 异步通信机制
Trap生成示例:
def generate_trap(trap_type, specific_code=0, variables=None):
"""生成SNMP Trap"""
trap = TrapPDU(
enterprise=get_enterprise_oid(),
agent_addr=get_local_ip(),
generic_trap=trap_type,
specific_trap=specific_code,
time_stamp=get_system_uptime(),
variable_bindings=variables or []
)
return trap
# 链路断开陷阱
link_down_trap = generate_trap(
trap_type=2, # linkDown
variables=[
("1.3.6.1.2.1.2.2.1.1.1", 1), # ifIndex.1
("1.3.6.1.2.1.2.2.1.7.1", 2) # ifAdminStatus.1 = down(2)
]
)
# 认证失败陷阱
auth_failure_trap = generate_trap(
trap_type=4, # authenticationFailure
variables=[
("1.3.6.1.2.1.1.3.0", get_system_uptime())
]
)
4.4 实现问题
4.4.1 性能优化
批量操作:
def bulk_get(agent, oid_list, max_repetitions=10):
"""批量获取操作(模拟GetBulk)"""
results = {}
# 将OID列表分组,避免PDU过大
for oid_group in chunk_list(oid_list, max_repetitions):
request = GetRequest(
request_id=generate_request_id(),
variable_bindings=[(oid, None) for oid in oid_group]
)
response = agent.send_request(request)
if response.error_status == 0:
for oid, value in response.variable_bindings:
results[oid] = value
else:
# 处理错误,可能需要单独重试
handle_bulk_error(response, oid_group)
return results
缓存机制:
class SNMPCache:
"""SNMP响应缓存"""
def __init__(self, ttl=60):
self.cache = {}
self.ttl = ttl
def get(self, oid):
"""获取缓存值"""
if oid in self.cache:
value, timestamp = self.cache[oid]
if time.time() - timestamp < self.ttl:
return value
else:
del self.cache[oid]
return None
def set(self, oid, value):
"""设置缓存值"""
self.cache[oid] = (value, time.time())
def invalidate(self, oid_pattern):
"""失效匹配模式的缓存"""
to_remove = [oid for oid in self.cache.keys()
if oid.startswith(oid_pattern)]
for oid in to_remove:
del self.cache[oid]
4.4.2 错误处理
重试机制:
def reliable_snmp_get(agent, oid, max_retries=3, timeout=5):
"""可靠的SNMP GET操作"""
for attempt in range(max_retries):
try:
response = agent.get(oid, timeout=timeout)
if response.error_status == 0:
return response.variable_bindings[0][1]
elif response.error_status == 2: # noSuchName
raise SNMPObjectNotFound(f"Object {oid} not found")
else:
raise SNMPError(f"SNMP error: {response.error_status}")
except TimeoutError:
if attempt == max_retries - 1:
raise SNMPTimeout(f"Timeout after {max_retries} attempts")
time.sleep(2 ** attempt) # 指数退避
except Exception as e:
if attempt == max_retries - 1:
raise
time.sleep(1)
错误分类处理:
class SNMPErrorHandler:
"""SNMP错误处理器"""
@staticmethod
def handle_error(error_status, error_index, variables):
"""处理SNMP错误"""
if error_status == 1: # tooBig
return SNMPErrorHandler.handle_too_big(variables)
elif error_status == 2: # noSuchName
return SNMPErrorHandler.handle_no_such_name(error_index, variables)
elif error_status == 3: # badValue
return SNMPErrorHandler.handle_bad_value(error_index, variables)
elif error_status == 4: # readOnly
return SNMPErrorHandler.handle_read_only(error_index, variables)
elif error_status == 5: # genError
return SNMPErrorHandler.handle_general_error(error_index, variables)
@staticmethod
def handle_too_big(variables):
"""处理PDU过大错误"""
# 分割变量列表,分批处理
chunk_size = len(variables) // 2
return [variables[:chunk_size], variables[chunk_size:]]
@staticmethod
def handle_no_such_name(error_index, variables):
"""处理对象不存在错误"""
problematic_var = variables[error_index - 1]
print(f"Object not found: {problematic_var[0]}")
# 从列表中移除问题变量,重试其他变量
return [var for i, var in enumerate(variables) if i != error_index - 1]
4.5 编程实践
4.5.1 完整的SNMP客户端实现
import socket
import time
from pyasn1.codec.der import encoder, decoder
from pyasn1.type import univ, namedtype, tag
class SNMPClient:
"""SNMP客户端实现"""
def __init__(self, host, port=161, community="public", timeout=5, retries=3):
self.host = host
self.port = port
self.community = community
self.timeout = timeout
self.retries = retries
self.request_id = 1
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.socket.settimeout(timeout)
def _next_request_id(self):
"""生成下一个请求ID"""
self.request_id += 1
return self.request_id
def _create_pdu(self, pdu_type, variables):
"""创建PDU"""
from pysnmp.proto import rfc1157
pdu = rfc1157.GetRequestPDU()
pdu.setComponentByName('request-id', self._next_request_id())
pdu.setComponentByName('error-status', 0)
pdu.setComponentByName('error-index', 0)
var_binds = pdu.setComponentByName('variable-bindings')
for i, (oid, value) in enumerate(variables):
var_bind = var_binds.setComponentByPosition(i)
var_bind.setComponentByName('name', oid)
if value is not None:
var_bind.setComponentByName('value', value)
return pdu
def _send_request(self, pdu):
"""发送SNMP请求"""
# 创建SNMP消息
message = self._create_message(pdu)
# 编码并发送
encoded_message = encoder.encode(message)
for attempt in range(self.retries):
try:
self.socket.sendto(encoded_message, (self.host, self.port))
response_data, addr = self.socket.recvfrom(65535)
# 解码响应
response_message, _ = decoder.decode(response_data)
return self._parse_response(response_message)
except socket.timeout:
if attempt == self.retries - 1:
raise TimeoutError(f"SNMP timeout after {self.retries} attempts")
continue
def get(self, oid):
"""SNMP GET操作"""
variables = [(oid, None)]
pdu = self._create_pdu('get', variables)
response = self._send_request(pdu)
if response['error_status'] == 0:
return response['variables'][0][1]
else:
raise SNMPError(f"SNMP error: {response['error_status']}")
def get_next(self, oid):
"""SNMP GETNEXT操作"""
variables = [(oid, None)]
pdu = self._create_pdu('getnext', variables)
response = self._send_request(pdu)
if response['error_status'] == 0:
return response['variables'][0]
else:
raise SNMPError(f"SNMP error: {response['error_status']}")
def set(self, oid, value):
"""SNMP SET操作"""
variables = [(oid, value)]
pdu = self._create_pdu('set', variables)
response = self._send_request(pdu)
if response['error_status'] != 0:
raise SNMPError(f"SNMP error: {response['error_status']}")
def walk(self, start_oid):
"""SNMP WALK操作"""
results = []
current_oid = start_oid
while True:
try:
next_oid, value = self.get_next(current_oid)
# 检查是否还在目标子树中
if not next_oid.startswith(start_oid):
break
results.append((next_oid, value))
current_oid = next_oid
except SNMPError:
break
return results
def close(self):
"""关闭连接"""
self.socket.close()
# 使用示例
def main():
client = SNMPClient("192.168.1.1", community="public")
try:
# 获取系统描述
sys_descr = client.get("1.3.6.1.2.1.1.1.0")
print(f"系统描述: {sys_descr}")
# 获取系统启动时间
sys_uptime = client.get("1.3.6.1.2.1.1.3.0")
print(f"系统启动时间: {sys_uptime}")
# 遍历接口表
print("\n接口信息:")
interfaces = client.walk("1.3.6.1.2.1.2.2.1.2")
for oid, desc in interfaces:
print(f"{oid}: {desc}")
# 设置系统联系人
client.set("1.3.6.1.2.1.1.4.0", "admin@example.com")
print("系统联系人已更新")
except Exception as e:
print(f"错误: {e}")
finally:
client.close()
if __name__ == "__main__":
main()
4.5.2 SNMP代理实现示例
class SNMPAgent:
"""简单的SNMP代理实现"""
def __init__(self, host="0.0.0.0", port=161, community="public"):
self.host = host
self.port = port
self.community = community
self.mib = {}
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.socket.bind((host, port))
self.running = False
# 初始化基本MIB对象
self._init_system_mib()
def _init_system_mib(self):
"""初始化系统MIB对象"""
self.mib.update({
"1.3.6.1.2.1.1.1.0": "Linux Router 1.0", # sysDescr
"1.3.6.1.2.1.1.2.0": "1.3.6.1.4.1.8072.3.2.10", # sysObjectID
"1.3.6.1.2.1.1.3.0": self._get_uptime, # sysUpTime (函数)
"1.3.6.1.2.1.1.4.0": "admin@localhost", # sysContact
"1.3.6.1.2.1.1.5.0": "localhost", # sysName
"1.3.6.1.2.1.1.6.0": "Unknown", # sysLocation
"1.3.6.1.2.1.1.7.0": 72, # sysServices
})
def _get_uptime(self):
"""获取系统启动时间"""
with open('/proc/uptime', 'r') as f:
uptime_seconds = float(f.readline().split()[0])
return int(uptime_seconds * 100) # 转换为centiseconds
def register_object(self, oid, value_or_function, access="read-only"):
"""注册MIB对象"""
self.mib[oid] = {
'value': value_or_function,
'access': access
}
def get_object_value(self, oid):
"""获取对象值"""
if oid in self.mib:
obj = self.mib[oid]
if callable(obj):
return obj()
elif isinstance(obj, dict):
value = obj['value']
return value() if callable(value) else value
else:
return obj
else:
raise SNMPError("noSuchName")
def set_object_value(self, oid, value):
"""设置对象值"""
if oid not in self.mib:
raise SNMPError("noSuchName")
obj = self.mib[oid]
if isinstance(obj, dict):
if obj['access'] == 'read-only':
raise SNMPError("readOnly")
obj['value'] = value
else:
raise SNMPError("readOnly")
def find_next_object(self, oid):
"""查找下一个对象"""
# 按词典顺序排序所有OID
sorted_oids = sorted(self.mib.keys())
for next_oid in sorted_oids:
if next_oid > oid:
return next_oid
return None
def process_get_request(self, pdu):
"""处理GET请求"""
response_vars = []
for var_oid, _ in pdu['variables']:
try:
value = self.get_object_value(var_oid)
response_vars.append((var_oid, value))
except SNMPError as e:
return self._create_error_response(pdu, str(e), 1)
return self._create_response(pdu, response_vars)
def process_getnext_request(self, pdu):
"""处理GETNEXT请求"""
response_vars = []
for var_oid, _ in pdu['variables']:
try:
next_oid = self.find_next_object(var_oid)
if next_oid:
value = self.get_object_value(next_oid)
response_vars.append((next_oid, value))
else:
return self._create_error_response(pdu, "noSuchName", 1)
except SNMPError as e:
return self._create_error_response(pdu, str(e), 1)
return self._create_response(pdu, response_vars)
def process_set_request(self, pdu):
"""处理SET请求"""
# 首先验证所有变量
for i, (var_oid, value) in enumerate(pdu['variables']):
if var_oid not in self.mib:
return self._create_error_response(pdu, "noSuchName", i + 1)
# 执行设置操作
response_vars = []
for i, (var_oid, value) in enumerate(pdu['variables']):
try:
self.set_object_value(var_oid, value)
response_vars.append((var_oid, value))
except SNMPError as e:
return self._create_error_response(pdu, str(e), i + 1)
return self._create_response(pdu, response_vars)
def run(self):
"""运行代理"""
self.running = True
print(f"SNMP Agent listening on {self.host}:{self.port}")
while self.running:
try:
data, addr = self.socket.recvfrom(65535)
# 解码请求
request = self._decode_message(data)
# 验证团体名
if request['community'] != self.community:
continue
# 处理请求
pdu = request['pdu']
if pdu['type'] == 'get':
response = self.process_get_request(pdu)
elif pdu['type'] == 'getnext':
response = self.process_getnext_request(pdu)
elif pdu['type'] == 'set':
response = self.process_set_request(pdu)
else:
continue
# 发送响应
response_data = self._encode_message(response)
self.socket.sendto(response_data, addr)
except Exception as e:
print(f"Error processing request: {e}")
def stop(self):
"""停止代理"""
self.running = False
self.socket.close()
# 使用示例
def run_agent():
agent = SNMPAgent()
# 注册自定义对象
agent.register_object("1.3.6.1.4.1.12345.1.1.0", "Custom Object", "read-write")
agent.register_object("1.3.6.1.4.1.12345.1.2.0", lambda: time.time(), "read-only")
try:
agent.run()
except KeyboardInterrupt:
print("\nStopping agent...")
agent.stop()
if __name__ == "__main__":
run_agent()
4.6 小结
SNMP协议是网络管理的核心协议,主要知识点包括:
-
演变历程:
- SNMPv1:简单但缺乏安全性
- SNMPv2:增强功能但安全问题未解决
- SNMPv3:完整的安全和管理功能
-
协议结构:
- 报文格式:版本号+团体名+PDU
- PDU类型:Get、GetNext、Set、GetResponse、Trap
- 错误处理机制
-
基本操作:
- Get:检索标量对象值
- GetNext:遍历MIB树和表
- Set:设置对象值
- Trap:异步事件报告
-
实现要点:
- 原子性操作
- 错误处理和重试机制
- 性能优化策略
- 缓存和批量操作
-
编程实践:
- 客户端实现
- 代理实现
- 错误处理
- 性能优化
理解SNMP协议的工作原理和实现细节对于网络管理系统的开发至关重要。
第五章 远程网络监视RMON
5.1 RMON基本概念
5.1.1 RMON概述
**远程网络监视(RMON)**是对SNMP标准的重要补充,是简单网络管理向互联网管理过渡的重要步骤。
RMON与MIB-2的区别:
- MIB-2:只能提供单个设备的管理信息(如进出某设备的分组数或字节数)
- RMON:能够监视整个网络的通信情况,提供网络级别的统计信息
网络监视器(Monitor/Analyzer/Probe):
- 观察LAN上出现的每个分组
- 进行统计和总结
- 提供重要的管理信息
- 存储部分分组供后续分析
- 根据分组类型进行过滤和捕获
5.1.2 RMON的目标
1. 离线操作
- 管理站可以停止对监视器的轮询
- 节省网络带宽和通信费用
- 监视器持续收集故障、性能和配置信息
- 在异常情况时及时报告管理站
2. 主动监视
- 连续或周期运行诊断程序
- 收集并记录网络性能参数
- 在子网失效时通知管理站
- 提供有用的诊断信息
3. 问题检测和报告
- 被动获取网络数据
- 连续观察网络资源消耗情况
- 记录异常条件(如网络拥塞)
- 在出现错误时通知管理站
4. 提供增值数据
- 分析收集到的子网数据
- 减轻管理站的计算任务
- 计算主机通信和出错统计
- 本地处理比远程处理更有效
5. 多管理站操作
- 支持多个管理站并发工作
- 提高可靠性
- 分布实现不同管理功能
- 为不同管理站提供不同信息
5.1.3 RMON表管理原理
新数据类型定义:
OwnerString ::= DisplayString
EntryStatus ::= INTEGER {
valid(1),
createRequest(2),
underCreation(3),
invalid(4)
}
表结构组成:
- 控制表:定义数据表的结构
- 数据表:存储实际数据
控制表列对象:
- rmlControlIndex:唯一标识控制表中的一行
- rmlControlParameter:应用于所有数据行的控制参数
- rmlControlOwner:控制行的所有者
- rmlControlStatus:控制行的状态
数据表索引:
- rmlDataControlIndex:与控制行索引值相同
- rmlDataIndex:唯一指定数据行集合中的某一行
5.1.4 表行管理操作
增加行的规则:
- 管理站用SetRequest生成新行
- 检查索引值是否冲突
- 代理产生新行,状态设为createRequest(2)
- 代理将状态置为underCreation(3)
- 管理站设置所有必要的列对象
- 管理站将状态设为valid(1)
删除行的方法:
- 只有行的所有者才能删除
- 发出SetRequest PDU将状态置为invalid(4)
- 物理删除取决于具体实现
修改行的步骤:
- 将行状态置为invalid(4)
- 用SetRequest PDU改变其他对象值
行状态转换图:
createRequest(2) → underCreation(3) → valid(1)
↓
invalid(4)
5.1.5 多管理站访问控制
并发访问问题:
- 多个管理站并发访问可能超过监视器服务能力
- 一个管理站可能长时间占用资源
- 占用资源的管理站可能崩溃而未释放资源
Owner字段的作用:
- 管理站能识别自己所属的资源
- 网络操作员可以了解资源占用情况
- 授权操作员可以释放其他操作员的资源
- 管理站重启后应释放不再使用的资源
5.2 RMON管理信息库
5.2.1 RMON MIB结构
RMON MIB是MIB-2下面的第16个子树,分为10个功能组:
- statistics(1):统计组
- history(2):历史组
- alarm(3):报警组
- hosts(4):主机组
- hostTopN(5):最高N台主机组
- matrix(6):矩阵组
- filter(7):过滤组
- capture(8):捕获组
- event(9):事件组
- tokenRing(10):令牌环组
实现约束关系:
- 实现报警组时必须实现事件组
- 实现最高N台主机组时必须实现主机组
- 实现捕获组时必须实现过滤组
5.2.2 统计组(Statistics Group)
功能:提供子网统计信息表,每行表示一个子网的统计信息。
主要对象:
etherStatsTable OBJECT-TYPE
SYNTAX SEQUENCE OF EtherStatsEntry
ACCESS not-accessible
STATUS mandatory
DESCRIPTION "A list of Ethernet statistics entries."
::= { statistics 1 }
etherStatsEntry OBJECT-TYPE
SYNTAX EtherStatsEntry
ACCESS not-accessible
STATUS mandatory
INDEX { etherStatsIndex }
::= { etherStatsTable 1 }
EtherStatsEntry ::= SEQUENCE {
etherStatsIndex INTEGER,
etherStatsDataSource OBJECT IDENTIFIER,
etherStatsDropEvents Counter,
etherStatsOctets Counter,
etherStatsPkts Counter,
etherStatsBroadcastPkts Counter,
etherStatsMulticastPkts Counter,
etherStatsCRCAlignErrors Counter,
etherStatsUndersizePkts Counter,
etherStatsOversizePkts Counter,
etherStatsFragments Counter,
etherStatsJabbers Counter,
etherStatsCollisions Counter,
etherStatsPkts64Octets Counter,
etherStatsPkts65to127Octets Counter,
etherStatsPkts128to255Octets Counter,
etherStatsPkts256to511Octets Counter,
etherStatsPkts512to1023Octets Counter,
etherStatsPkts1024to1518Octets Counter,
etherStatsOwner OwnerString,
etherStatsStatus EntryStatus
}
关键字段说明:
- etherStatsIndex:表项索引,对应子网接口
- etherStatsDataSource:监视器接收数据的子网接口(ifIndex实例)
- etherStatsDropEvents:由于资源不足而丢弃的事件数
- etherStatsOctets:接收的字节总数
- etherStatsPkts:接收的分组总数
- etherStatsCRCAlignErrors:CRC和对准错误数
- etherStatsCollisions:冲突数
5.2.3 历史组(History Group)
功能:定期采样统计组中的计数器,保存历史数据。
控制表:
historyControlTable OBJECT-TYPE
SYNTAX SEQUENCE OF HistoryControlEntry
ACCESS not-accessible
STATUS mandatory
::= { history 1 }
HistoryControlEntry ::= SEQUENCE {
historyControlIndex INTEGER,
historyControlDataSource OBJECT IDENTIFIER,
historyControlBucketsRequested INTEGER,
historyControlBucketsGranted INTEGER,
historyControlInterval INTEGER,
historyControlOwner OwnerString,
historyControlStatus EntryStatus
}
数据表:
etherHistoryTable OBJECT-TYPE
SYNTAX SEQUENCE OF EtherHistoryEntry
ACCESS not-accessible
STATUS mandatory
::= { history 2 }
EtherHistoryEntry ::= SEQUENCE {
etherHistoryIndex INTEGER,
etherHistorySampleIndex INTEGER,
etherHistoryIntervalStart TimeTicks,
etherHistoryDropEvents Counter,
etherHistoryOctets Counter,
etherHistoryPkts Counter,
etherHistoryBroadcastPkts Counter,
etherHistoryMulticastPkts Counter,
etherHistoryCRCAlignErrors Counter,
etherHistoryUndersizePkts Counter,
etherHistoryOversizePkts Counter,
etherHistoryFragments Counter,
etherHistoryJabbers Counter,
etherHistoryCollisions Counter,
etherHistoryUtilization INTEGER
}
5.2.4 报警组(Alarm Group)
功能:定期检查统计变量,当变量值超过阈值时产生事件。
报警表定义:
alarmTable OBJECT-TYPE
SYNTAX SEQUENCE OF AlarmEntry
ACCESS not-accessible
STATUS mandatory
::= { alarm 1 }
AlarmEntry ::= SEQUENCE {
alarmIndex INTEGER,
alarmInterval INTEGER,
alarmVariable OBJECT IDENTIFIER,
alarmSampleType INTEGER,
alarmValue INTEGER,
alarmStartupAlarm INTEGER,
alarmRisingThreshold INTEGER,
alarmFallingThreshold INTEGER,
alarmRisingEventIndex INTEGER,
alarmFallingEventIndex INTEGER,
alarmOwner OwnerString,
alarmStatus EntryStatus
}
采样类型:
- absoluteValue(1):绝对值采样
- deltaValue(2):增量值采样
5.2.5 主机组(Hosts Group)
功能:收集每个主机的统计信息。
主机表定义:
hostTable OBJECT-TYPE
SYNTAX SEQUENCE OF HostEntry
ACCESS not-accessible
STATUS mandatory
::= { hosts 4 }
HostEntry ::= SEQUENCE {
hostAddress OCTET STRING,
hostCreationOrder INTEGER,
hostIndex INTEGER,
hostInPkts Counter,
hostOutPkts Counter,
hostInOctets Counter,
hostOutOctets Counter,
hostOutErrors Counter,
hostOutBroadcastPkts Counter,
hostOutMulticastPkts Counter
}
5.2.6 矩阵组(Matrix Group)
功能:收集主机对之间的通信统计信息。
矩阵表定义:
matrixSDTable OBJECT-TYPE
SYNTAX SEQUENCE OF MatrixSDEntry
ACCESS not-accessible
STATUS mandatory
::= { matrix 1 }
MatrixSDEntry ::= SEQUENCE {
matrixSDSourceAddress OCTET STRING,
matrixSDDestAddress OCTET STRING,
matrixSDIndex INTEGER,
matrixSDPkts Counter,
matrixSDOctets Counter,
matrixSDErrors Counter
}
5.3 编程实践
5.3.1 RMON统计信息收集
import time
import threading
from collections import defaultdict, deque
from dataclasses import dataclass
from typing import Dict, List, Optional
@dataclass
class EthernetStats:
"""以太网统计信息"""
octets: int = 0
pkts: int = 0
broadcast_pkts: int = 0
multicast_pkts: int = 0
crc_align_errors: int = 0
undersize_pkts: int = 0
oversize_pkts: int = 0
fragments: int = 0
jabbers: int = 0
collisions: int = 0
pkts_64_octets: int = 0
pkts_65_to_127_octets: int = 0
pkts_128_to_255_octets: int = 0
pkts_256_to_511_octets: int = 0
pkts_512_to_1023_octets: int = 0
pkts_1024_to_1518_octets: int = 0
drop_events: int = 0
class RMONStatisticsCollector:
"""RMON统计信息收集器"""
def __init__(self):
self.interfaces = {} # interface_index -> EthernetStats
self.running = False
self.collection_thread = None
self.lock = threading.Lock()
def add_interface(self, interface_index: int, data_source: str):
"""添加监控接口"""
with self.lock:
self.interfaces[interface_index] = {
'data_source': data_source,
'stats': EthernetStats(),
'owner': 'monitor',
'status': 'valid'
}
def remove_interface(self, interface_index: int):
"""移除监控接口"""
with self.lock:
if interface_index in self.interfaces:
del self.interfaces[interface_index]
def update_packet_stats(self, interface_index: int, packet_size: int,
packet_type: str, has_error: bool = False):
"""更新分组统计信息"""
with self.lock:
if interface_index not in self.interfaces:
return
stats = self.interfaces[interface_index]['stats']
# 更新基本统计
stats.octets += packet_size
stats.pkts += 1
# 根据分组类型更新
if packet_type == 'broadcast':
stats.broadcast_pkts += 1
elif packet_type == 'multicast':
stats.multicast_pkts += 1
# 根据分组大小分类
if packet_size == 64:
stats.pkts_64_octets += 1
elif 65 <= packet_size <= 127:
stats.pkts_65_to_127_octets += 1
elif 128 <= packet_size <= 255:
stats.pkts_128_to_255_octets += 1
elif 256 <= packet_size <= 511:
stats.pkts_256_to_511_octets += 1
elif 512 <= packet_size <= 1023:
stats.pkts_512_to_1023_octets += 1
elif 1024 <= packet_size <= 1518:
stats.pkts_1024_to_1518_octets += 1
elif packet_size > 1518:
stats.oversize_pkts += 1
elif packet_size < 64:
stats.undersize_pkts += 1
# 错误统计
if has_error:
stats.crc_align_errors += 1
def get_interface_stats(self, interface_index: int) -> Optional[EthernetStats]:
"""获取接口统计信息"""
with self.lock:
if interface_index in self.interfaces:
return self.interfaces[interface_index]['stats']
return None
def get_all_stats(self) -> Dict[int, EthernetStats]:
"""获取所有接口统计信息"""
with self.lock:
return {idx: info['stats'] for idx, info in self.interfaces.items()}
# 使用示例
def demo_rmon_statistics():
"""RMON统计演示"""
collector = RMONStatisticsCollector()
# 添加监控接口
collector.add_interface(1, "eth0")
collector.add_interface(2, "eth1")
# 模拟分组统计
import random
for _ in range(1000):
interface = random.choice([1, 2])
size = random.randint(64, 1518)
pkt_type = random.choice(['unicast', 'broadcast', 'multicast'])
has_error = random.random() < 0.01 # 1%错误率
collector.update_packet_stats(interface, size, pkt_type, has_error)
# 获取统计结果
for interface_idx in [1, 2]:
stats = collector.get_interface_stats(interface_idx)
if stats:
print(f"接口 {interface_idx} 统计:")
print(f" 总字节数: {stats.octets}")
print(f" 总分组数: {stats.pkts}")
print(f" 广播分组: {stats.broadcast_pkts}")
print(f" 组播分组: {stats.multicast_pkts}")
print(f" CRC错误: {stats.crc_align_errors}")
print()
if __name__ == "__main__":
demo_rmon_statistics()
5.3.2 RMON历史数据管理
import time
from collections import deque
from dataclasses import dataclass, asdict
from typing import Dict, List, Optional
@dataclass
class HistorySample:
"""历史采样数据"""
sample_index: int
interval_start: int
drop_events: int
octets: int
pkts: int
broadcast_pkts: int
multicast_pkts: int
crc_align_errors: int
undersize_pkts: int
oversize_pkts: int
fragments: int
jabbers: int
collisions: int
utilization: int # 百分比
class RMONHistoryCollector:
"""RMON历史数据收集器"""
def __init__(self):
self.history_controls = {} # control_index -> control_info
self.history_data = {} # control_index -> deque of samples
self.running = False
self.collection_thread = None
def create_history_control(self, control_index: int, data_source: str,
buckets_requested: int = 50, interval: int = 1800):
"""创建历史控制条目"""
self.history_controls[control_index] = {
'data_source': data_source,
'buckets_requested': buckets_requested,
'buckets_granted': min(buckets_requested, 65535),
'interval': interval, # 秒
'owner': 'monitor',
'status': 'valid',
'last_sample_time': 0,
'sample_counter': 0
}
# 初始化历史数据队列
max_samples = self.history_controls[control_index]['buckets_granted']
self.history_data[control_index] = deque(maxlen=max_samples)
def collect_sample(self, control_index: int, stats: EthernetStats):
"""收集历史采样"""
if control_index not in self.history_controls:
return
control = self.history_controls[control_index]
current_time = int(time.time())
# 检查是否到了采样时间
if current_time - control['last_sample_time'] < control['interval']:
return
# 创建新的采样
control['sample_counter'] += 1
sample = HistorySample(
sample_index=control['sample_counter'],
interval_start=current_time,
drop_events=stats.drop_events,
octets=stats.octets,
pkts=stats.pkts,
broadcast_pkts=stats.broadcast_pkts,
multicast_pkts=stats.multicast_pkts,
crc_align_errors=stats.crc_align_errors,
undersize_pkts=stats.undersize_pkts,
oversize_pkts=stats.oversize_pkts,
fragments=stats.fragments,
jabbers=stats.jabbers,
collisions=stats.collisions,
utilization=self._calculate_utilization(stats, control['interval'])
)
# 添加到历史数据队列
self.history_data[control_index].append(sample)
control['last_sample_time'] = current_time
def _calculate_utilization(self, stats: EthernetStats, interval: int) -> int:
"""计算链路利用率"""
# 假设10Mbps以太网
max_bits_per_second = 10 * 1000 * 1000
actual_bits = stats.octets * 8
utilization = (actual_bits / (max_bits_per_second * interval)) * 100
return min(int(utilization), 100)
def get_history_samples(self, control_index: int,
start_sample: int = None,
end_sample: int = None) -> List[HistorySample]:
"""获取历史采样数据"""
if control_index not in self.history_data:
return []
samples = list(self.history_data[control_index])
if start_sample is not None:
samples = [s for s in samples if s.sample_index >= start_sample]
if end_sample is not None:
samples = [s for s in samples if s.sample_index <= end_sample]
return samples
def get_latest_samples(self, control_index: int, count: int = 10) -> List[HistorySample]:
"""获取最新的N个采样"""
if control_index not in self.history_data:
return []
samples = list(self.history_data[control_index])
return samples[-count:] if len(samples) >= count else samples
# 使用示例
def demo_rmon_history():
"""RMON历史数据演示"""
history_collector = RMONHistoryCollector()
stats_collector = RMONStatisticsCollector()
# 创建历史控制
history_collector.create_history_control(
control_index=1,
data_source="eth0",
buckets_requested=24, # 24小时
interval=3600 # 每小时采样
)
# 添加统计接口
stats_collector.add_interface(1, "eth0")
# 模拟数据收集
for hour in range(24):
# 模拟一小时的流量
for _ in range(1000):
size = random.randint(64, 1518)
pkt_type = random.choice(['unicast', 'broadcast', 'multicast'])
stats_collector.update_packet_stats(1, size, pkt_type)
# 收集历史采样
current_stats = stats_collector.get_interface_stats(1)
if current_stats:
history_collector.collect_sample(1, current_stats)
# 模拟时间推进
time.sleep(0.1) # 实际应用中是3600秒
# 获取历史数据
samples = history_collector.get_latest_samples(1, 10)
print("最近10个历史采样:")
for sample in samples:
print(f"采样 {sample.sample_index}: "
f"分组数={sample.pkts}, "
f"字节数={sample.octets}, "
f"利用率={sample.utilization}%")
if __name__ == "__main__":
demo_rmon_history()
5.3.3 RMON报警系统
import time
import threading
from enum import Enum
from dataclasses import dataclass
from typing import Dict, List, Callable, Optional
class AlarmSampleType(Enum):
ABSOLUTE_VALUE = 1
DELTA_VALUE = 2
class AlarmStartup(Enum):
RISING = 1
FALLING = 2
RISING_OR_FALLING = 3
@dataclass
class AlarmEntry:
"""报警条目"""
alarm_index: int
interval: int
variable: str # OID
sample_type: AlarmSampleType
rising_threshold: int
falling_threshold: int
rising_event_index: int
falling_event_index: int
startup_alarm: AlarmStartup
owner: str
status: str
last_sample_value: Optional[int] = None
last_sample_time: int = 0
alarm_state: str = "normal" # normal, rising, falling
class RMONAlarmManager:
"""RMON报警管理器"""
def __init__(self, event_manager):
self.alarms = {} # alarm_index -> AlarmEntry
self.event_manager = event_manager
self.running = False
self.alarm_thread = None
self.lock = threading.Lock()
def create_alarm(self, alarm_index: int, interval: int, variable: str,
sample_type: AlarmSampleType, rising_threshold: int,
falling_threshold: int, rising_event_index: int,
falling_event_index: int, startup_alarm: AlarmStartup,
owner: str = "monitor"):
"""创建报警条目"""
with self.lock:
alarm = AlarmEntry(
alarm_index=alarm_index,
interval=interval,
variable=variable,
sample_type=sample_type,
rising_threshold=rising_threshold,
falling_threshold=falling_threshold,
rising_event_index=rising_event_index,
falling_event_index=falling_event_index,
startup_alarm=startup_alarm,
owner=owner,
status="valid"
)
self.alarms[alarm_index] = alarm
def delete_alarm(self, alarm_index: int):
"""删除报警条目"""
with self.lock:
if alarm_index in self.alarms:
del self.alarms[alarm_index]
def check_alarm(self, alarm_index: int, current_value: int):
"""检查报警条件"""
with self.lock:
if alarm_index not in self.alarms:
return
alarm = self.alarms[alarm_index]
current_time = int(time.time())
# 检查采样间隔
if current_time - alarm.last_sample_time < alarm.interval:
return
# 计算采样值
if alarm.sample_type == AlarmSampleType.ABSOLUTE_VALUE:
sample_value = current_value
else: # DELTA_VALUE
if alarm.last_sample_value is not None:
sample_value = current_value - alarm.last_sample_value
else:
sample_value = 0
# 检查阈值
self._check_thresholds(alarm, sample_value)
# 更新采样信息
alarm.last_sample_value = current_value
alarm.last_sample_time = current_time
def _check_thresholds(self, alarm: AlarmEntry, sample_value: int):
"""检查阈值条件"""
# 上升阈值检查
if (sample_value >= alarm.rising_threshold and
alarm.alarm_state != "rising"):
alarm.alarm_state = "rising"
self.event_manager.trigger_event(
alarm.rising_event_index,
f"Rising alarm on {alarm.variable}: {sample_value} >= {alarm.rising_threshold}"
)
# 下降阈值检查
elif (sample_value <= alarm.falling_threshold and
alarm.alarm_state != "falling"):
alarm.alarm_state = "falling"
self.event_manager.trigger_event(
alarm.falling_event_index,
f"Falling alarm on {alarm.variable}: {sample_value} <= {alarm.falling_threshold}"
)
# 正常状态
elif (alarm.falling_threshold < sample_value < alarm.rising_threshold):
alarm.alarm_state = "normal"
def start_monitoring(self):
"""开始监控"""
self.running = True
self.alarm_thread = threading.Thread(target=self._monitor_loop)
self.alarm_thread.start()
def stop_monitoring(self):
"""停止监控"""
self.running = False
if self.alarm_thread:
self.alarm_thread.join()
def _monitor_loop(self):
"""监控循环"""
while self.running:
# 这里应该从实际的MIB对象获取值
# 为演示目的,我们使用模拟值
for alarm_index in list(self.alarms.keys()):
# 模拟获取变量值
current_value = self._get_variable_value(
self.alarms[alarm_index].variable
)
if current_value is not None:
self.check_alarm(alarm_index, current_value)
time.sleep(1)
def _get_variable_value(self, variable: str) -> Optional[int]:
"""获取变量值(模拟实现)"""
# 实际实现中应该从SNMP代理获取值
import random
return random.randint(0, 1000)
class RMONEventManager:
"""RMON事件管理器"""
def __init__(self):
self.events = {} # event_index -> event_info
self.event_log = []
self.lock = threading.Lock()
def create_event(self, event_index: int, event_type: str,
description: str, owner: str = "monitor"):
"""创建事件条目"""
with self.lock:
self.events[event_index] = {
'type': event_type, # log, snmp-trap, log-and-trap
'description': description,
'owner': owner,
'status': 'valid'
}
def trigger_event(self, event_index: int, message: str):
"""触发事件"""
with self.lock:
if event_index not in self.events:
return
event = self.events[event_index]
timestamp = int(time.time())
# 记录事件日志
log_entry = {
'event_index': event_index,
'timestamp': timestamp,
'message': message,
'description': event['description']
}
self.event_log.append(log_entry)
# 根据事件类型执行相应动作
if event['type'] in ['log', 'log-and-trap']:
self._log_event(log_entry)
if event['type'] in ['snmp-trap', 'log-and-trap']:
self._send_trap(log_entry)
def _log_event(self, log_entry: dict):
"""记录事件到日志"""
print(f"[{log_entry['timestamp']}] Event {log_entry['event_index']}: "
f"{log_entry['message']}")
def _send_trap(self, log_entry: dict):
"""发送SNMP陷阱"""
# 实际实现中应该发送SNMP Trap
print(f"TRAP: Event {log_entry['event_index']} - {log_entry['message']}")
def get_event_log(self, max_entries: int = 100) -> List[dict]:
"""获取事件日志"""
with self.lock:
return self.event_log[-max_entries:]
# 使用示例
def demo_rmon_alarm():
"""RMON报警演示"""
event_manager = RMONEventManager()
alarm_manager = RMONAlarmManager(event_manager)
# 创建事件
event_manager.create_event(1, "log-and-trap", "High traffic alarm")
event_manager.create_event(2, "log", "Low traffic alarm")
# 创建报警
alarm_manager.create_alarm(
alarm_index=1,
interval=30, # 30秒检查一次
variable="1.3.6.1.2.1.2.2.1.10.1", # ifInOctets.1
sample_type=AlarmSampleType.DELTA_VALUE,
rising_threshold=1000000, # 1MB/30s
falling_threshold=100000, # 100KB/30s
rising_event_index=1,
falling_event_index=2,
startup_alarm=AlarmStartup.RISING_OR_FALLING
)
# 开始监控
alarm_manager.start_monitoring()
# 运行一段时间
time.sleep(10)
# 停止监控
alarm_manager.stop_monitoring()
# 查看事件日志
log_entries = event_manager.get_event_log()
print(f"\n事件日志 ({len(log_entries)} 条):")
for entry in log_entries:
print(f" {entry['timestamp']}: {entry['message']}")
if __name__ == "__main__":
demo_rmon_alarm()
5.4 小结
RMON远程网络监视是网络管理的重要扩展,主要知识点包括:
-
基本概念:
- RMON是SNMP的重要补充
- 提供网络级别的监视能力
- 支持离线操作和主动监视
-
表管理机制:
- 控制表和数据表结构
- 行状态管理(创建、修改、删除)
- 多管理站访问控制
-
功能组结构:
- 10个功能组,各有特定用途
- 实现约束关系
- 可选实现机制
-
核心功能组:
- 统计组:基本网络统计
- 历史组:历史数据管理
- 报警组:阈值监控
- 主机组:主机级统计
- 矩阵组:主机对通信统计
-
编程实践:
- 统计信息收集
- 历史数据管理
- 报警系统实现
- 事件管理机制
RMON为网络管理提供了强大的监视和分析能力,是现代网络管理系统的重要组成部分。
第六章 网络管理工具(<font color="#f79646">考命令</font>)
6.1 Windows管理命令
Windows操作系统提供了一组实用的网络配置和管理命令,这些命令以DOS命令形式出现,存储在system32目录中。通过运行"cmd.exe"进入命令窗口,可以执行各种网络管理任务。
6.1.1 ipconfig命令
功能:显示和配置TCP/IP网络参数,相当于Windows 9x中的Winipcfg图形化工具。
语法:
ipconfig [/all] [/renew[Adapter]] [/release[Adapter]] [/flushdns]
[/displaydns] [/registerdns] [/showclassid Adapter]
[/setclassid Adapter [ClassID]]
主要参数:
- /all:显示所有网卡的详细TCP/IP配置信息
- /renew:更新指定网卡的DHCP配置
- /release:释放指定网卡的DHCP配置
- /flushdns:清空DNS解析器缓存
- /displaydns:显示DNS解析器缓存内容
- /registerdns:刷新所有DHCP租约并重新注册DNS名称
使用示例:
# 显示基本网络配置
ipconfig
# 显示详细网络配置
ipconfig /all
# 释放并重新获取IP地址
ipconfig /release
ipconfig /renew
# 清空DNS缓存
ipconfig /flushdns
6.1.2 ping命令
功能:通过发送ICMP回声请求报文检验与目标主机的连接。
语法:
ping [-t] [-a] [-n Count] [-l Size] [-f] [-i TTL] [-v TOS]
[-r Count] [-s Count] [{-j HostList | -k HostList}]
[-w Timeout] [TargetName]
主要参数:
- -t:持续ping直到停止
- -a:将地址解析为主机名
- -n Count:发送Count个回声请求
- -l Size:发送缓冲区大小
- -f:设置"不分段"标志
- -i TTL:设置生存时间
- -w Timeout:等待超时时间(毫秒)
使用示例:
# 基本ping测试
ping www.baidu.com
# 持续ping
ping -t 192.168.1.1
# 发送10个大包
ping -n 10 -l 1024 192.168.1.1
# 设置超时时间
ping -w 3000 192.168.1.1
6.1.3 arp命令
功能:显示和修改地址解析协议(ARP)缓存表。
语法:
arp [-a[InetAddr] [-N IfaceAddr]] [-g [InetAddr][-N IfaceAddr]]
[-d InetAddr[IfaceAddr]] [-s InetAddr EtherAddr[IfaceAddr]]
主要参数:
- -a:显示所有ARP表项
- -g:与-a相同
- -d:删除指定的ARP表项
- -s:添加静态ARP表项
- -N:显示指定网络接口的ARP表项
使用示例:
# 显示所有ARP表项
arp -a
# 显示指定IP的ARP表项
arp -a 192.168.1.1
# 删除ARP表项
arp -d 192.168.1.1
# 添加静态ARP表项
arp -s 192.168.1.100 00-11-22-33-44-55
6.1.4 netstat命令
功能:显示网络连接、路由表和网络接口统计信息。
语法:
netstat [-a] [-e] [-n] [-o] [-p Protocol] [-r] [-s] [Interval]
主要参数:
- -a:显示所有连接和监听端口
- -e:显示以太网统计信息
- -n:以数字形式显示地址和端口
- -o:显示进程ID
- -p Protocol:显示指定协议的连接
- -r:显示路由表
- -s:显示协议统计信息
使用示例:
# 显示所有网络连接
netstat -a
# 显示TCP连接及进程ID
netstat -ano -p tcp
# 显示路由表
netstat -r
# 显示网络统计信息
netstat -s
6.1ujh666666666666666666666666666666666666666666.5 tracert命令
功能:跟踪数据包到达目标的路径,显示每个路由器的IP地址和响应时间。
语法:
tracert [-d] [-h MaximumHops] [-j HostList] [-w Timeout] [TargetName]
主要参数:
- -d:不解析地址为主机名
- -h MaximumHops:最大跳数
- -j HostList:松散源路由
- -w Timeout:等待超时时间
使用示例:
# 基本路由跟踪
tracert www.baidu.com
# 不解析主机名
tracert -d 8.8.8.8
# 设置最大跳数
tracert -h 15 www.google.com
6.1.6 pathping 命令
功能:
结合 tracert 和 ping 的功能,统计路径中每个节点的延迟和丢包率。
语法:
pathping [-n] [-h MaxHops] [-w Timeout] [Target]
参数:
-n:不解析主机名。-h MaxHops:最大跳数。-w Timeout:超时时间。
示例:
pathping www.google.com # 分析路径延迟和丢包
pathping -n 192.168.1.1 # 禁用主机名解析
输出解析:
- 先显示类似
tracert的路径。 - 对每个节点进行统计(延迟、丢包率)。
6.1.7 route 命令
功能:
管理本地 IP 路由表,查看、添加、删除或修改路由规则。
常用子命令:
route print # 显示当前路由表
route add 目标网络 mask 子网掩码 网关 # 添加静态路由
route delete 目标网络 # 删除路由
示例:
route print # 查看路由表
route add 10.0.0.0 mask 255.0.0.0 192.168.1.1 # 添加静态路由
route delete 10.0.0.0 # 删除路由
应用场景:
多网卡环境、VPN 路由配置、跨子网通信。
6.1.8 netsh 命令
功能:
强大的网络配置工具,可管理接口、防火墙、路由、NAT 等。
常用场景:
-
查看/设置IP地址:
textnetsh interface ip show config # 显示配置 netsh interface ip set address "以太网" static 192.168.1.100 255.255.255.0 192.168.1.1 -
防火墙管理:
textnetsh advfirewall show allprofiles # 查看防火墙状态 netsh advfirewall set allprofiles state off # 关闭防火墙 -
Wi-Fi 管理:
textnetsh wlan show profiles # 显示保存的Wi-Fi配置 netsh wlan connect name="SSID" # 连接指定Wi-Fi
6.1.9 nslookup 命令
功能:
查询 DNS 记录,诊断域名解析问题。
语法:
nslookup [域名/IP] [DNS服务器]
示例:
nslookup www.baidu.com # 查询A记录
nslookup -type=mx example.com # 查询MX记录(邮件服务器)
nslookup 8.8.8.8 # 反向DNS查询
交互模式:
nslookup
> server 8.8.8.8 # 切换DNS服务器
> set type=AAAA # 查询IPv6记录
> www.google.com
6. 1.10net 命令
功能:
多功能网络管理工具,涵盖用户、共享、服务等。
常用子命令:
-
用户管理:
textnet user # 查看用户列表 net user John 123456 /add # 创建用户 -
共享资源:
textnet share # 查看共享文件夹 net share Data=C:\Data /grant:Everyone,READ # 创建共享 -
服务管理:
textnet start # 查看运行中的服务 net stop "Windows Update" # 停止服务 -
网络连接:
textnet use # 查看映射的驱动器 net use Z: \\192.168.1.100\Share # 映射网络驱动器
6.2 网络监视工具
6.2.1 网络监听原理
混杂模式(Promiscuous Mode):
- 正常模式下,网卡只接收发送给自己的数据包和广播包
- 混杂模式下,网卡接收所有经过的数据包
- 以太网采用广播通信,同一冲突域中的所有端口都能看到数据包
监听检测方法:
- 时延检测:混杂模式主机处理大量分组,响应变慢
- 错误MAC地址测试:发送错误MAC地址但正确IP地址的ping包
安全风险:
- 可以截获未加密的协议数据(FTP、Telnet等)
- 可能被用于网络入侵和信息窃取
- 常用于ARP欺骗等恶意攻击
6.2.2 网络嗅探器(Sniffer)
Sniffer功能组件:
- 监视:实时解码并显示网络通信流
- 捕获:抓取并保存网络数据包
- 分析:利用专家系统分析网络问题
- 显示:以统计表或图形方式显示数据
监控信息类型:
- 负载统计:帧数、字节数、网络利用率、广播/组播计数
- 出错统计:CRC错误、冲突碎片、超长帧、对准错误
- 协议统计:按底层协议分类的统计数据
- 应用统计:响应时间和相关统计
- 会话统计:单个工作站或会话组的通信量
- 包大小统计:不同大小数据包的分布
6.2.3 编程实现网络监听工具
import socket
import struct
import threading
import time
from dataclasses import dataclass
from typing import Dict, List, Optional
from collections import defaultdict, deque
@dataclass
class PacketInfo:
"""数据包信息"""
timestamp: float
src_mac: str
dst_mac: str
src_ip: str
dst_ip: str
protocol: str
size: int
src_port: Optional[int] = None
dst_port: Optional[int] = None
class NetworkSniffer:
"""网络嗅探器"""
def __init__(self, interface: str = None):
self.interface = interface
self.running = False
self.packets = deque(maxlen=10000) # 保存最近10000个包
self.statistics = defaultdict(int)
self.protocol_stats = defaultdict(int)
self.size_stats = defaultdict(int)
self.lock = threading.Lock()
# 创建原始套接字
try:
self.socket = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.ntohs(0x0003))
if interface:
self.socket.bind((interface, 0))
except PermissionError:
raise PermissionError("需要管理员权限来创建原始套接字")
except Exception as e:
raise Exception(f"创建套接字失败: {e}")
def start_capture(self):
"""开始捕获"""
self.running = True
self.capture_thread = threading.Thread(target=self._capture_loop)
self.capture_thread.start()
def stop_capture(self):
"""停止捕获"""
self.running = False
if hasattr(self, 'capture_thread'):
self.capture_thread.join()
self.socket.close()
def _capture_loop(self):
"""捕获循环"""
while self.running:
try:
# 接收数据包
packet, addr = self.socket.recvfrom(65535)
timestamp = time.time()
# 解析数据包
packet_info = self._parse_packet(packet, timestamp)
if packet_info:
with self.lock:
self.packets.append(packet_info)
self._update_statistics(packet_info)
except Exception as e:
if self.running:
print(f"捕获数据包时出错: {e}")
break
def _parse_packet(self, packet: bytes, timestamp: float) -> Optional[PacketInfo]:
"""解析数据包"""
try:
# 解析以太网头部
eth_header = packet[:14]
eth = struct.unpack('!6s6sH', eth_header)
dst_mac = ':'.join(['%02x' % b for b in eth[0]])
src_mac = ':'.join(['%02x' % b for b in eth[1]])
eth_type = eth[2]
# 只处理IP数据包
if eth_type != 0x0800:
return None
# 解析IP头部
ip_header = packet[14:34]
ip = struct.unpack('!BBHHHBBH4s4s', ip_header)
version = ip[0] >> 4
ihl = ip[0] & 0xF
protocol = ip[6]
src_ip = socket.inet_ntoa(ip[8])
dst_ip = socket.inet_ntoa(ip[9])
# 解析传输层协议
src_port = dst_port = None
protocol_name = 'Unknown'
if protocol == 6: # TCP
protocol_name = 'TCP'
tcp_header = packet[34:54]
tcp = struct.unpack('!HHLLBBHHH', tcp_header)
src_port = tcp[0]
dst_port = tcp[1]
elif protocol == 17: # UDP
protocol_name = 'UDP'
udp_header = packet[34:42]
udp = struct.unpack('!HHHH', udp_header)
src_port = udp[0]
dst_port = udp[1]
elif protocol == 1: # ICMP
protocol_name = 'ICMP'
return PacketInfo(
timestamp=timestamp,
src_mac=src_mac,
dst_mac=dst_mac,
src_ip=src_ip,
dst_ip=dst_ip,
protocol=protocol_name,
size=len(packet),
src_port=src_port,
dst_port=dst_port
)
except Exception as e:
return None
def _update_statistics(self, packet: PacketInfo):
"""更新统计信息"""
self.statistics['total_packets'] += 1
self.statistics['total_bytes'] += packet.size
self.protocol_stats[packet.protocol] += 1
# 按大小分类
if packet.size <= 64:
self.size_stats['64'] += 1
elif packet.size <= 128:
self.size_stats['65-128'] += 1
elif packet.size <= 256:
self.size_stats['129-256'] += 1
elif packet.size <= 512:
self.size_stats['257-512'] += 1
elif packet.size <= 1024:
self.size_stats['513-1024'] += 1
else:
self.size_stats['1024+'] += 1
# 检查广播和组播
if packet.dst_mac.startswith('ff:ff:ff'):
self.statistics['broadcast_packets'] += 1
elif packet.dst_mac.startswith('01:00:5e'):
self.statistics['multicast_packets'] += 1
def get_statistics(self) -> dict:
"""获取统计信息"""
with self.lock:
return {
'general': dict(self.statistics),
'protocols': dict(self.protocol_stats),
'sizes': dict(self.size_stats),
'packet_count': len(self.packets)
}
def get_recent_packets(self, count: int = 100) -> List[PacketInfo]:
"""获取最近的数据包"""
with self.lock:
return list(self.packets)[-count:]
def filter_packets(self, **filters) -> List[PacketInfo]:
"""过滤数据包"""
with self.lock:
filtered = []
for packet in self.packets:
match = True
if 'src_ip' in filters and packet.src_ip != filters['src_ip']:
match = False
if 'dst_ip' in filters and packet.dst_ip != filters['dst_ip']:
match = False
if 'protocol' in filters and packet.protocol != filters['protocol']:
match = False
if 'src_port' in filters and packet.src_port != filters['src_port']:
match = False
if 'dst_port' in filters and packet.dst_port != filters['dst_port']:
match = False
if match:
filtered.append(packet)
return filtered
def get_top_talkers(self, count: int = 10) -> List[tuple]:
"""获取流量最大的主机"""
ip_stats = defaultdict(int)
with self.lock:
for packet in self.packets:
ip_stats[packet.src_ip] += packet.size
ip_stats[packet.dst_ip] += packet.size
return sorted(ip_stats.items(), key=lambda x: x[1], reverse=True)[:count]
def get_protocol_distribution(self) -> dict:
"""获取协议分布"""
with self.lock:
total = sum(self.protocol_stats.values())
if total == 0:
return {}
return {proto: (count / total) * 100
for proto, count in self.protocol_stats.items()}
class NetworkAnalyzer:
"""网络分析器"""
def __init__(self, sniffer: NetworkSniffer):
self.sniffer = sniffer
def analyze_traffic_patterns(self) -> dict:
"""分析流量模式"""
packets = self.sniffer.get_recent_packets(1000)
if not packets:
return {}
# 时间分布分析
time_buckets = defaultdict(int)
for packet in packets:
bucket = int(packet.timestamp) // 60 # 按分钟分组
time_buckets[bucket] += 1
# 流量方向分析
internal_ips = set()
external_ips = set()
for packet in packets:
if packet.src_ip.startswith('192.168.') or packet.src_ip.startswith('10.'):
internal_ips.add(packet.src_ip)
else:
external_ips.add(packet.src_ip)
if packet.dst_ip.startswith('192.168.') or packet.dst_ip.startswith('10.'):
internal_ips.add(packet.dst_ip)
else:
external_ips.add(packet.dst_ip)
return {
'time_distribution': dict(time_buckets),
'internal_hosts': len(internal_ips),
'external_hosts': len(external_ips),
'total_packets': len(packets)
}
def detect_anomalies(self) -> List[dict]:
"""检测异常"""
anomalies = []
stats = self.sniffer.get_statistics()
# 检测异常高的流量
if stats['general'].get('total_packets', 0) > 10000:
anomalies.append({
'type': 'high_traffic',
'description': '检测到异常高的数据包数量',
'value': stats['general']['total_packets']
})
# 检测异常的协议分布
protocol_dist = self.sniffer.get_protocol_distribution()
if protocol_dist.get('ICMP', 0) > 50:
anomalies.append({
'type': 'icmp_flood',
'description': 'ICMP流量异常高',
'value': f"{protocol_dist['ICMP']:.1f}%"
})
return anomalies
# 使用示例(需要管理员权限)
def demo_network_sniffer():
"""网络嗅探器演示"""
try:
# 创建嗅探器
sniffer = NetworkSniffer()
analyzer = NetworkAnalyzer(sniffer)
print("开始网络监听...")
sniffer.start_capture()
# 运行一段时间
time.sleep(30)
# 获取统计信息
stats = sniffer.get_statistics()
print(f"\n统计信息:")
print(f" 总数据包: {stats['general'].get('total_packets', 0)}")
print(f" 总字节数: {stats['general'].get('total_bytes', 0)}")
print(f" 广播包: {stats['general'].get('broadcast_packets', 0)}")
print(f"\n协议分布:")
for proto, count in stats['protocols'].items():
print(f" {proto}: {count}")
# 获取流量最大的主机
top_talkers = sniffer.get_top_talkers(5)
print(f"\n流量最大的主机:")
for ip, bytes_count in top_talkers:
print(f" {ip}: {bytes_count} 字节")
# 分析流量模式
patterns = analyzer.analyze_traffic_patterns()
print(f"\n流量分析:")
print(f" 内部主机: {patterns.get('internal_hosts', 0)}")
print(f" 外部主机: {patterns.get('external_hosts', 0)}")
# 检测异常
anomalies = analyzer.detect_anomalies()
if anomalies:
print(f"\n检测到异常:")
for anomaly in anomalies:
print(f" {anomaly['description']}: {anomaly['value']}")
# 停止监听
sniffer.stop_capture()
print("\n监听已停止")
except PermissionError:
print("错误: 需要管理员权限来运行网络嗅探器")
except Exception as e:
print(f"错误: {e}")
if __name__ == "__main__":
demo_network_sniffer()
6.3 网络管理平台
现代网络管理需要集成化的管理平台,提供统一的监控、配置和故障管理功能。
6.3.1 网络管理平台架构
import asyncio
import json
import sqlite3
import time
from dataclasses import dataclass, asdict
from typing import Dict, List, Optional, Callable
from enum import Enum
class DeviceStatus(Enum):
UP = "up"
DOWN = "down"
WARNING = "warning"
UNKNOWN = "unknown"
@dataclass
class NetworkDevice:
"""网络设备"""
id: str
name: str
ip_address: str
device_type: str
location: str
status: DeviceStatus
last_seen: float
snmp_community: str = "public"
snmp_port: int = 161
@dataclass
class MonitoringMetric:
"""监控指标"""
device_id: str
metric_name: str
value: float
unit: str
timestamp: float
threshold_warning: Optional[float] = None
threshold_critical: Optional[float] = None
class NetworkManagementPlatform:
"""网络管理平台"""
def __init__(self, db_path: str = "network_management.db"):
self.db_path = db_path
self.devices = {}
self.metrics = []
self.alerts = []
self.monitoring_tasks = {}
self.event_handlers = {}
# 初始化数据库
self._init_database()
def _init_database(self):
"""初始化数据库"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 创建设备表
cursor.execute('''
CREATE TABLE IF NOT EXISTS devices (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
ip_address TEXT NOT NULL,
device_type TEXT NOT NULL,
location TEXT,
status TEXT NOT NULL,
last_seen REAL,
snmp_community TEXT,
snmp_port INTEGER,
created_at REAL DEFAULT (julianday('now'))
)
''')
# 创建指标表
cursor.execute('''
CREATE TABLE IF NOT EXISTS metrics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
device_id TEXT NOT NULL,
metric_name TEXT NOT NULL,
value REAL NOT NULL,
unit TEXT,
timestamp REAL NOT NULL,
threshold_warning REAL,
threshold_critical REAL,
FOREIGN KEY (device_id) REFERENCES devices (id)
)
''')
# 创建告警表
cursor.execute('''
CREATE TABLE IF NOT EXISTS alerts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
device_id TEXT NOT NULL,
alert_type TEXT NOT NULL,
severity TEXT NOT NULL,
message TEXT NOT NULL,
timestamp REAL NOT NULL,
acknowledged BOOLEAN DEFAULT FALSE,
resolved BOOLEAN DEFAULT FALSE,
FOREIGN KEY (device_id) REFERENCES devices (id)
)
''')
conn.commit()
conn.close()
def add_device(self, device: NetworkDevice):
"""添加设备"""
self.devices[device.id] = device
# 保存到数据库
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT OR REPLACE INTO devices
(id, name, ip_address, device_type, location, status, last_seen, snmp_community, snmp_port)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
device.id, device.name, device.ip_address, device.device_type,
device.location, device.status.value, device.last_seen,
device.snmp_community, device.snmp_port
))
conn.commit()
conn.close()
# 启动监控
self.start_device_monitoring(device.id)
def remove_device(self, device_id: str):
"""移除设备"""
if device_id in self.devices:
del self.devices[device_id]
# 停止监控
self.stop_device_monitoring(device_id)
# 从数据库删除
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('DELETE FROM devices WHERE id = ?', (device_id,))
conn.commit()
conn.close()
def get_device(self, device_id: str) -> Optional[NetworkDevice]:
"""获取设备"""
return self.devices.get(device_id)
def get_all_devices(self) -> List[NetworkDevice]:
"""获取所有设备"""
return list(self.devices.values())
def update_device_status(self, device_id: str, status: DeviceStatus):
"""更新设备状态"""
if device_id in self.devices:
old_status = self.devices[device_id].status
self.devices[device_id].status = status
self.devices[device_id].last_seen = time.time()
# 状态变化时生成告警
if old_status != status:
self._generate_status_alert(device_id, old_status, status)
def add_metric(self, metric: MonitoringMetric):
"""添加监控指标"""
self.metrics.append(metric)
# 保存到数据库
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO metrics
(device_id, metric_name, value, unit, timestamp, threshold_warning, threshold_critical)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (
metric.device_id, metric.metric_name, metric.value, metric.unit,
metric.timestamp, metric.threshold_warning, metric.threshold_critical
))
conn.commit()
conn.close()
# 检查阈值
self._check_metric_thresholds(metric)
def get_device_metrics(self, device_id: str, metric_name: str = None,
hours: int = 24) -> List[MonitoringMetric]:
"""获取设备指标"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
since_time = time.time() - (hours * 3600)
if metric_name:
cursor.execute('''
SELECT device_id, metric_name, value, unit, timestamp, threshold_warning, threshold_critical
FROM metrics
WHERE device_id = ? AND metric_name = ? AND timestamp > ?
ORDER BY timestamp DESC
''', (device_id, metric_name, since_time))
else:
cursor.execute('''
SELECT device_id, metric_name, value, unit, timestamp, threshold_warning, threshold_critical
FROM metrics
WHERE device_id = ? AND timestamp > ?
ORDER BY timestamp DESC
''', (device_id, since_time))
metrics = []
for row in cursor.fetchall():
metrics.append(MonitoringMetric(
device_id=row[0],
metric_name=row[1],
value=row[2],
unit=row[3],
timestamp=row[4],
threshold_warning=row[5],
threshold_critical=row[6]
))
conn.close()
return metrics
def start_device_monitoring(self, device_id: str):
"""开始设备监控"""
if device_id not in self.monitoring_tasks:
task = asyncio.create_task(self._monitor_device(device_id))
self.monitoring_tasks[device_id] = task
def stop_device_monitoring(self, device_id: str):
"""停止设备监控"""
if device_id in self.monitoring_tasks:
self.monitoring_tasks[device_id].cancel()
del self.monitoring_tasks[device_id]
async def _monitor_device(self, device_id: str):
"""监控设备"""
while True:
try:
device = self.devices.get(device_id)
if not device:
break
# 执行ping测试
ping_success = await self._ping_device(device.ip_address)
if ping_success:
self.update_device_status(device_id, DeviceStatus.UP)
# 收集SNMP指标
await self._collect_snmp_metrics(device)
else:
self.update_device_status(device_id, DeviceStatus.DOWN)
# 等待下次监控
await asyncio.sleep(60) # 每分钟监控一次
except asyncio.CancelledError:
break
except Exception as e:
print(f"监控设备 {device_id} 时出错: {e}")
await asyncio.sleep(60)
async def _ping_device(self, ip_address: str) -> bool:
"""Ping设备"""
try:
process = await asyncio.create_subprocess_exec(
'ping', '-n', '1', '-w', '3000', ip_address,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
return process.returncode == 0
except Exception:
return False
async def _collect_snmp_metrics(self, device: NetworkDevice):
"""收集SNMP指标"""
try:
# 这里应该实现实际的SNMP查询
# 为演示目的,我们生成模拟数据
import random
timestamp = time.time()
# CPU利用率
cpu_usage = random.uniform(10, 90)
self.add_metric(MonitoringMetric(
device_id=device.id,
metric_name="cpu_usage",
value=cpu_usage,
unit="%",
timestamp=timestamp,
threshold_warning=80.0,
threshold_critical=95.0
))
# 内存利用率
memory_usage = random.uniform(20, 85)
self.add_metric(MonitoringMetric(
device_id=device.id,
metric_name="memory_usage",
value=memory_usage,
unit="%",
timestamp=timestamp,
threshold_warning=85.0,
threshold_critical=95.0
))
# 接口流量
interface_traffic = random.uniform(1000000, 100000000)
self.add_metric(MonitoringMetric(
device_id=device.id,
metric_name="interface_traffic",
value=interface_traffic,
unit="bps",
timestamp=timestamp,
threshold_warning=80000000.0,
threshold_critical=95000000.0
))
except Exception as e:
print(f"收集SNMP指标时出错: {e}")
def _check_metric_thresholds(self, metric: MonitoringMetric):
"""检查指标阈值"""
if metric.threshold_critical and metric.value >= metric.threshold_critical:
self._generate_alert(
metric.device_id,
"threshold_critical",
"critical",
f"{metric.metric_name} 达到严重阈值: {metric.value} {metric.unit}"
)
elif metric.threshold_warning and metric.value >= metric.threshold_warning:
self._generate_alert(
metric.device_id,
"threshold_warning",
"warning",
f"{metric.metric_name} 达到警告阈值: {metric.value} {metric.unit}"
)
def _generate_status_alert(self, device_id: str, old_status: DeviceStatus, new_status: DeviceStatus):
"""生成状态变化告警"""
if new_status == DeviceStatus.DOWN:
severity = "critical"
message = f"设备从 {old_status.value} 变为 {new_status.value}"
elif new_status == DeviceStatus.UP and old_status == DeviceStatus.DOWN:
severity = "info"
message = f"设备恢复正常: 从 {old_status.value} 变为 {new_status.value}"
else:
severity = "warning"
message = f"设备状态变化: 从 {old_status.value} 变为 {new_status.value}"
self._generate_alert(device_id, "status_change", severity, message)
def _generate_alert(self, device_id: str, alert_type: str, severity: str, message: str):
"""生成告警"""
alert = {
'device_id': device_id,
'alert_type': alert_type,
'severity': severity,
'message': message,
'timestamp': time.time(),
'acknowledged': False,
'resolved': False
}
self.alerts.append(alert)
# 保存到数据库
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO alerts
(device_id, alert_type, severity, message, timestamp, acknowledged, resolved)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (
device_id, alert_type, severity, message,
alert['timestamp'], alert['acknowledged'], alert['resolved']
))
conn.commit()
conn.close()
# 触发事件处理器
self._trigger_event_handlers('alert_generated', alert)
def get_alerts(self, device_id: str = None, severity: str = None,
unresolved_only: bool = True) -> List[dict]:
"""获取告警"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
query = 'SELECT * FROM alerts WHERE 1=1'
params = []
if device_id:
query += ' AND device_id = ?'
params.append(device_id)
if severity:
query += ' AND severity = ?'
params.append(severity)
if unresolved_only:
query += ' AND resolved = FALSE'
query += ' ORDER BY timestamp DESC'
cursor.execute(query, params)
alerts = []
for row in cursor.fetchall():
alerts.append({
'id': row[0],
'device_id': row[1],
'alert_type': row[2],
'severity': row[3],
'message': row[4],
'timestamp': row[5],
'acknowledged': bool(row[6]),
'resolved': bool(row[7])
})
conn.close()
return alerts
def acknowledge_alert(self, alert_id: int):
"""确认告警"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute(
'UPDATE alerts SET acknowledged = TRUE WHERE id = ?',
(alert_id,)
)
conn.commit()
conn.close()
def resolve_alert(self, alert_id: int):
"""解决告警"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute(
'UPDATE alerts SET resolved = TRUE WHERE id = ?',
(alert_id,)
)
conn.commit()
conn.close()
def register_event_handler(self, event_type: str, handler: Callable):
"""注册事件处理器"""
if event_type not in self.event_handlers:
self.event_handlers[event_type] = []
self.event_handlers[event_type].append(handler)
def _trigger_event_handlers(self, event_type: str, data: dict):
"""触发事件处理器"""
if event_type in self.event_handlers:
for handler in self.event_handlers[event_type]:
try:
handler(data)
except Exception as e:
print(f"事件处理器执行失败: {e}")
def get_dashboard_data(self) -> dict:
"""获取仪表板数据"""
total_devices = len(self.devices)
up_devices = len([d for d in self.devices.values() if d.status == DeviceStatus.UP])
down_devices = len([d for d in self.devices.values() if d.status == DeviceStatus.DOWN])
critical_alerts = len([a for a in self.get_alerts() if a['severity'] == 'critical'])
warning_alerts = len([a for a in self.get_alerts() if a['severity'] == 'warning'])
return {
'devices': {
'total': total_devices,
'up': up_devices,
'down': down_devices,
'availability': (up_devices / total_devices * 100) if total_devices > 0 else 0
},
'alerts': {
'critical': critical_alerts,
'warning': warning_alerts,
'total': critical_alerts + warning_alerts
}
}
# 使用示例
async def demo_network_management_platform():
"""网络管理平台演示"""
platform = NetworkManagementPlatform()
# 添加设备
devices = [
NetworkDevice(
id="router-001",
name="核心路由器",
ip_address="192.168.1.1",
device_type="router",
location="机房A",
status=DeviceStatus.UNKNOWN,
last_seen=time.time()
),
NetworkDevice(
id="switch-001",
name="接入交换机",
ip_address="192.168.1.10",
device_type="switch",
location="机房A",
status=DeviceStatus.UNKNOWN,
last_seen=time.time()
),
NetworkDevice(
id="server-001",
name="Web服务器",
ip_address="192.168.1.100",
device_type="server",
location="机房B",
status=DeviceStatus.UNKNOWN,
last_seen=time.time()
)
]
for device in devices:
platform.add_device(device)
# 注册告警处理器
def alert_handler(alert_data):
print(f"[告警] {alert_data['severity'].upper()}: {alert_data['message']}")
platform.register_event_handler('alert_generated', alert_handler)
print("网络管理平台已启动,开始监控设备...")
# 运行一段时间
await asyncio.sleep(30)
# 获取仪表板数据
dashboard = platform.get_dashboard_data()
print(f"\n仪表板数据:")
print(f" 设备总数: {dashboard['devices']['total']}")
print(f" 在线设备: {dashboard['devices']['up']}")
print(f" 离线设备: {dashboard['devices']['down']}")
print(f" 可用性: {dashboard['devices']['availability']:.1f}%")
print(f" 严重告警: {dashboard['alerts']['critical']}")
print(f" 警告告警: {dashboard['alerts']['warning']}")
# 获取设备指标
for device in devices:
metrics = platform.get_device_metrics(device.id, hours=1)
print(f"\n{device.name} 指标 ({len(metrics)} 条):")
for metric in metrics[-3:]: # 显示最近3条
print(f" {metric.metric_name}: {metric.value} {metric.unit}")
# 获取告警
alerts = platform.get_alerts()
print(f"\n当前告警 ({len(alerts)} 条):")
for alert in alerts[-5:]: # 显示最近5条
print(f" [{alert['severity']}] {alert['message']}")
# 停止所有监控任务
for device_id in list(platform.monitoring_tasks.keys()):
platform.stop_device_monitoring(device_id)
if __name__ == "__main__":
asyncio.run(demo_network_management_platform())
6.4 小结
网络管理工具是网络运维的重要支撑,主要知识点包括:
-
Windows管理命令:
- ipconfig:网络配置管理
- ping:连通性测试
- arp:地址解析协议管理
- netstat:网络连接和统计信息
- tracert:路由跟踪
-
网络监视工具:
- 混杂模式监听原理
- 网络嗅探器功能
- 数据包捕获和分析
- 流量统计和异常检测
-
网络管理平台:
- 设备管理和监控
- 指标收集和阈值告警
- 事件处理和通知
- 仪表板和报表
-
编程实践:
- 命令行工具封装
- 网络监听程序
- 管理平台架构
- 异步监控和告警
掌握这些工具和技术对于网络管理人员的日常工作至关重要,能够有效提高网络运维效率和故障处理能力。
附录
附录A 常用网络管理命令速查表
| 命令 | 功能 | 常用参数 | 示例 |
|---|---|---|---|
| ipconfig | 显示和配置TCP/IP参数 | /all, /renew, /release, /flushdns | ipconfig /all |
| ping | 测试网络连通性 | -t, -n, -l, -w | ping -n 4 192.168.1.1 |
| arp | 管理ARP缓存表 | -a, -d, -s | arp -a |
| netstat | 显示网络连接和统计 | -a, -n, -r, -s | netstat -an |
| tracert | 跟踪路由路径 | -d, -h, -w | tracert www.baidu.com |
| nslookup | DNS查询工具 | 交互式/非交互式 | nslookup www.google.com |
附录B ASN.1基本类型对照表
| ASN.1类型 | 标签 | 描述 | 示例 |
|---|---|---|---|
| BOOLEAN | [UNIVERSAL 1] | 布尔值 | TRUE, FALSE |
| INTEGER | [UNIVERSAL 2] | 整数 | 42, -100 |
| BIT STRING | [UNIVERSAL 3] | 位串 | '1010'B |
| OCTET STRING | [UNIVERSAL 4] | 字节串 | 'Hello'H |
| NULL | [UNIVERSAL 5] | 空值 | NULL |
| OBJECT IDENTIFIER | [UNIVERSAL 6] | 对象标识符 | 1.3.6.1.2.1.1.1 |
| SEQUENCE | [UNIVERSAL 16] | 序列 | 有序集合 |
| SET | [UNIVERSAL 17] | 集合 | 无序集合 |
附录C SNMP错误状态码
| 错误码 | 名称 | 描述 |
|---|---|---|
| 0 | noError | 无错误 |
| 1 | tooBig | 响应消息太大 |
| 2 | noSuchName | 变量名不存在 |
| 3 | badValue | 变量值错误 |
| 4 | readOnly | 变量只读 |
| 5 | genErr | 一般错误 |
附录D MIB-2主要组对象标识符
| 组名 | OID | 描述 |
|---|---|---|
| system | 1.3.6.1.2.1.1 | 系统组 |
| interfaces | 1.3.6.1.2.1.2 | 接口组 |
| at | 1.3.6.1.2.1.3 | 地址转换组 |
| ip | 1.3.6.1.2.1.4 | IP组 |
| icmp | 1.3.6.1.2.1.5 | ICMP组 |
| tcp | 1.3.6.1.2.1.6 | TCP组 |
| udp | 1.3.6.1.2.1.7 | UDP组 |
| egp | 1.3.6.1.2.1.8 | EGP组 |
| transmission | 1.3.6.1.2.1.10 | 传输组 |
| snmp | 1.3.6.1.2.1.11 | SNMP组 |
附录E RMON功能组概览
| 组号 | 组名 | 功能描述 | 依赖关系 |
|---|---|---|---|
| 1 | statistics | 以太网统计信息 | 无 |
| 2 | history | 历史数据收集 | 无 |
| 3 | alarm | 阈值告警 | 需要event组 |
| 4 | hosts | 主机统计 | 无 |
| 5 | hostTopN | 最高N台主机 | 需要hosts组 |
| 6 | matrix | 主机对通信矩阵 | 无 |
| 7 | filter | 数据包过滤 | 无 |
| 8 | capture | 数据包捕获 | 需要filter组 |
| 9 | event | 事件管理 | 无 |
| 10 | tokenRing | 令牌环统计 | 无 |
参考文献
- RFC 1157 - Simple Network Management Protocol (SNMP)
- RFC 1213 - Management Information Base for Network Management of TCP/IP-based internets: MIB-II
- RFC 1271 - Remote Network Monitoring Management Information Base
- RFC 3411 - An Architecture for Describing Simple Network Management Protocol (SNMP) Management Frameworks
- ITU-T X.680 - Information technology - Abstract Syntax Notation One (ASN.1): Specification of basic notation
- ITU-T X.690 - Information technology - ASN.1 encoding rules: Specification of Basic Encoding Rules (BER)
学习建议
理论学习要点
- 掌握基本概念:深入理解网络管理的基本概念、目标和架构
- 熟悉协议标准:重点掌握SNMP协议族和相关RFC标准
- 理解数据结构:掌握ASN.1语法和MIB结构定义
- 学习监控技术:了解RMON远程监控的原理和应用
实践操作建议
- 命令行工具:熟练使用Windows网络管理命令
- 编程实践:通过Python等语言实现SNMP客户端
- 工具使用:学会使用网络监控和分析工具
- 故障排除:培养网络故障诊断和解决能力
考试复习策略
- 知识点梳理:按章节系统复习,重点掌握核心概念
- 代码理解:理解示例代码的实现原理和应用场景
- 实际应用:结合实际网络环境理解管理需求
- 综合练习:通过综合案例加深理解
22级网工《计算机网络管理》期末考样题
一、单项选择题
-
ISO定义的网络管理功能中,()包括的功能有风险分析、网管系统保护等。 A
A、计费管理
B、配置管理
C、性能管理
D、安全管理 -
SNMP的MIB-2中含有计量器类型,对计量器Gauge正确的叙述是 C A. Gauge的最大值可以设置为小于2^32的任意正整数
B. 计量器达到最大值后保持锁定不变
C. Gauge只有达到2^32-1后才会自动减少
D. Gauge的最大值总是2^32
二、填空题
- 现代网络管理系统是由以下4个要素组成:网络管理者(站)、管理代理,网络管理协议及管理信息库MIB等。
三、判断题
- SNMPv2增加了管理站之间的通信机制,为此引入了通知报文和MIB-2数据库(F )
填入 T/F(True/False)
四、简答题
- SNMPv1规定了哪些协议数据单元?简述它们的作用?SNMPv2又增加了哪些协议数据单元?
五、综合分析题
- 表5-1是一个简化的路由表,图5-1是MIB-2的Ip组。其中只有一个索引对象ipRouteDest,请在表5-2中填入路由表对象及其实例的词典顺序。
表5-1 路由表 ipRouteTable(1.3.6.1.2.1.4.21)
ipRouteEntry(1.3.6.1.2.1.4.21.1=x)
ipRouteDest(1)
……
ipRouteMetricl(3)
……
ipRouteNextHop(7)
| ipRouteDest | ipRouteMetric1 | ipRouteNextHop |
|---|---|---|
| 10.10.10.10 | 4 | 9.9.9.9 |
| 11.11.11.11 | 5 | 8.8.8.8 |
表5-2 路由表对象实例
| 对象 | 对象标识符 | 下一对象实例 |
|---|---|---|
好的,以下是一个与原题类似的MIB-2路由表题目,但我做了 IP 地址和数据内容的修改,考察点相同,让你自行完成。
🧩 题目:补全下表中“下一对象实例”列
请根据下表中的路由表信息,填写表格中“下一对象实例”列。
注意:MIB-2 路由表中对象索引为 ipRouteDest,遍历顺序按 IP 字典序。
📘 路由条目表(图示)
| ipRouteDest | ipRouteMetric1 | ipRouteNextHop |
|---|---|---|
| 192.168.1.1 | 2 | 10.0.0.1 |
| 172.16.0.1 | 5 | 10.0.0.2 |
📋 表格结构如下(请你完成“下一对象实例”列):
| 对象 | 对象标识符 | 下一对象实例 |
|---|---|---|
| ipRouteDest | 1.3.6.1.2.1.4.21 | 1.3.6.1.2.1.4.21.192.168.1.1 |
| ipRouteEntry | 1.3.6.1.2.1.4.21.1 | 1.3.6.1.2.1.4.21.1.192.168.1.1 |
| ipRouteDest | 1.3.6.1.2.1.4.21.1.1 | 1.3.6.1.2.1.4.21.1.1.192.168.1.1 |
| ipRouteMetric1 | 1.3.6.1.2.1.4.21.1.3 | 1.3.6.1.2.1.4.21.1.3.192.168.1.1 |
| ipRouteNextHop | 1.3.6.1.2.1.4.21.1.7 | 1.3.6.1.2.1.4.21.1.7.192.168.1.1 |
| ipRouteDest | 1.3.6.1.2.1.4.21.1.1 | 1.3.6.1.2.1.4.21.1.1.172.16.0.1 |
| ipRouteMetric1 | 1.3.6.1.2.1.4.21.1.3 | 1.3.6.1.2.1.4.21.1.3.172.16.0.1 |
| ipRouteNextHop | 1.3.6.1.2.1.4.21.1.7 | 1.3.6.1.2.1.4.21.1.3.172.16.0.1 |
🧠 提示
-
比较 IP:172.16.0.1 vs 192.168.1.1 → 谁小谁排前
-
“下一对象实例” = 对象标识符 + 索引(即 ipRouteDest 地址)
-
IP 地址要写成
.172.16.0.1这样的后缀
📌 要求:
请你完成“下一对象实例”这一列,并确保结构正确、顺序合理。
完成后我可以帮你对答案、讲解原因。
准备好了吗?你可以直接填写表格或者给出每一行的完整实例路径~
网络管理MIB结构练习题
题目要求
根据以下MIB结构示意图,回答相关问题:
ifTable (1.3.6.1.2.1.2.2)
|
+-- ifEntry (1.3.6.1.2.1.2.2.1=x)
|
+-- ifIndex(1) [接口索引号]
+-- ifDescr(2) [接口描述]
+-- ifType(3) [接口类型]
+-- ifMtu(4) [最大传输单元]
+-- ifSpeed(5) [接口速率]
+-- ifPhysAddress(6) [物理地址]
+-- ifAdminStatus(7) [管理状态]
+-- ifOperStatus(8) [操作状态]
+-- ifLastChange(9) [最后状态改变时间]
+-- ifInOctets(10) [输入字节数]
+-- ifOutOctets(16) [输出字节数]
问题
-
请写出查询接口索引为3的接口描述(ifDescr)的完整OID
-
1.3.6.1.2.1.2.2.1.3.1
-
如果要获取接口索引为5的操作状态(ifOperStatus),应该使用什么OID? 1.3.6.1.2.1.2.2.1.5.8
-
假设设备有两个接口:
- 接口1:ifIndex=1, ifDescr="GigabitEthernet0/1", ifOperStatus=1(up)
- 接口2:ifIndex=2, ifDescr="FastEthernet0/2", ifOperStatus=2(down)
请按词典顺序列出这两个接口所有对象实例的OID
-
解释ifAdminStatus和ifOperStatus的区别
-
如果要监控所有接口的输入流量,应该定期查询哪些OID?
提示
- 词典顺序规则:先按OID数字顺序排列,相同OID按实例索引排序
- 接口状态:1=up, 2=down, 3=testing
- 对象实例OID格式:表OID.列号.实例索引