一、报告任务

对于一个通信数据包,文件中每一行为:X,Y,代表 IP 为 X 的主机发向 IP 为 Y 的主机的一个数据包。使用 Count-Min Sketch 方法识别 Top-10 频繁通信主机对。


二、算法原理

Count-Min Sketch 算法用于解决大数据统计难题。算法的特点是:不存储所有的不同元素,只存储它们的 Sketch 计数。基本思路如下:

  1. 创建一个长度为 m 的数组,用来计数,初始化每个元素的计数值为 0。
  2. 对于新来的元素,哈希到 0 到 m 之间的一个数,比如哈希值为 i,作为数组的位置索引。
  3. 数组对应的位置索引 i 的计数值加 1。
  4. 查询某个元素出现的频率时,返回该元素哈希后对应的数组位置索引的计数值即可。

考虑到使用哈希会有冲突,即不同的元素哈希到同一个数组的位置索引,这样频率的统计都会偏大。可以使用多个数组和多个哈希函数来计算一个元素对应的数组位置索引;查询某个元素的频率时,返回该元素在不同数组中的计数值中的最小值即可。

Count-Min Sketch 算法的特点是:

  • 只会估算偏大,永远不会偏小。
  • 只需要固定大小的内存和计算时间,与需要统计的元素数量无关。
  • 对于低频的元素,估算值相对的错误可能会很大。

三、参数选取

  1. 哈希函数的选取:依据对象中的成员变量的值,依照一定的规则计算出一个整数作为哈希值。哈希值最重要的两个属性是:

    1. 假设 a.equals(b),那么 a.hashCode() == b.hashCode()
    2. 理想状况下,假设 !a.equals(b),那么 a.hashCode() != b.hashCode()

    本报告中取用的 6 个哈希函数为:

    hash1 = (a * b) % c
    hash2 = (a + b) % c
    hash3 = abs(a - b) % c
    hash4 = (a | b) % c
    hash5 = (a ^ b) % c
    hash6 = (a & b) % c
    
  2. 哈希表宽度:总的键值对个数为 18866027,数量级为 10^7。一般来说,选择哈希表宽度越宽,内存占用越多,但统计越准确。经过检验,在使用 6 个哈希函数的情况下,选择哈希表宽度数量级最小为 10^3,能够节省内存且对结果影响不明显。


四、计算结果

经过计算,TOP10 频繁通信主机对如下表所示:

X 主机 IP Y 主机 IP 通信次数
647067801 1281048150 89002
1148279497 1767542340 83314
3309037651 2254759 79542
352061493 1281048150 61993
3986324625 1281048150 60681
4196430673 1281048150 58824
4033332886 1281048150 57050
1191936803 1714489407 56650
1381261349 1654615343 56172
23675731 1281048150 52888

使用直接计数方法统计出 TOP10 频繁通信主机对,并与 Count-Min Sketch 进行对比。在同样使用 6 个哈希函数的情况下,不同哈希表宽度频率统计误差如下图所示。

由上图可见,当哈希函数选择 6 个时,哈希表宽度为 10^4 数量级最合适。


五、程序

# 创建 6 个哈希函数
def hash1(a, b, c):
    return (a * b) % c

def hash2(a, b, c):
    return (a + b) % c

def hash3(a, b, c):
    return abs(a - b) % c

def hash4(a, b, c):
    return (a | b) % c

def hash5(a, b, c):
    return (a ^ b) % c

def hash6(a, b, c):
    return (a & b) % c

def creat_Hash_label(m, k):
    # 创建初始哈希表,m 为哈希表宽度,k 为哈希函数个数
    return [[0] * m for _ in range(k)]

def record_frequency(Hash_label, data):
    n = len(data)
    # 利用哈希表统计频数
    for i in range(n):
        a, b = map(int, data[i].split())
        Hash_label[0][hash1(a, b, m)] += 1
        Hash_label[1][hash2(a, b, m)] += 1
        Hash_label[2][hash3(a, b, m)] += 1
        Hash_label[3][hash4(a, b, m)] += 1
        Hash_label[4][hash5(a, b, m)] += 1
        Hash_label[5][hash6(a, b, m)] += 1
    return Hash_label

def count_frequency(Hash_label):
    Top10_frequency = [0] * 10
    Top_XY = {}
    n = len(data)

    for i in range(n):
        if i % 1000000 == 0:
            print('已完成:%.2f' % ((i / 20000000) + 0.1))
        a, b = map(int, data[i].split())
        min_max = min(
            Hash_label[0][hash1(a, b, m)],
            Hash_label[1][hash2(a, b, m)],
            Hash_label[2][hash3(a, b, m)],
            Hash_label[3][hash4(a, b, m)],
            Hash_label[4][hash5(a, b, m)],
            Hash_label[5][hash6(a, b, m)]
        )
        str_temp = data[i]
        if min_max > Top10_frequency[9] and (str_temp not in Top_XY):
            Top_XY[str_temp] = min_max
            Top10_frequency[9] = min_max
            Top10_frequency.sort(reverse=True)

    Top_XY = sorted(Top_XY.items(), key=lambda item: item[1], reverse=True)
    Top10_XY = {items[0]: items[1] for items in Top_XY[:10]}
    return Top10_XY

# 主程序
with open('equinix-chicago.dirA.20140320-130000.UTC.anon.pcap.flow.txt', 'r') as file:
    data = file.readlines()

m = 1880341  # 哈希表宽度
init_Hash_label = creat_Hash_label(m, 6)  # 生成初始哈希表
Hash_label = record_frequency(init_Hash_label, data)  # 将数据统计到哈希表中
Top10_XY = count_frequency(Hash_label)  # 统计频繁通信主机对
print(Top10_XY)
Logo

永洪科技,致力于打造全球领先的数据技术厂商,具备从数据应用方案咨询、BI、AIGC智能分析、数字孪生、数据资产、数据治理、数据实施的端到端大数据价值服务能力。

更多推荐