-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbloom.py
More file actions
140 lines (108 loc) · 3.44 KB
/
Copy pathbloom.py
File metadata and controls
140 lines (108 loc) · 3.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
import argparse
import hashlib
import time
from bitarray import bitarray
BIT_ARRAY_SIZE = 150_000_000 # number of bits
NUM_HASHES = 7
parser = argparse.ArgumentParser()
parser.add_argument("source")
parser.add_argument("test")
args = parser.parse_args()
source_file = args.source
test_file = args.test
class BloomFilter:
"""
Probabilistic data structure for membership testing.
Supports fast lookups with possible false Positives
but no false negatives.
"""
def __init__(self, size, num_hashes):
self.size = size
self.num_hashes = num_hashes
self.bits = bitarray(size)
self.bits.setall(0)
def _hashes(self, word):
"""
Generate hash indices for a word using multiple
BLAKE2b-based hash functions.
"""
word_bytes = word.encode("utf-8", errors="ignore")
for i in range(self.num_hashes):
seed = i.to_bytes(2, "little")
digest = hashlib.blake2b(word_bytes + seed, digest_size=8).digest()
yield int.from_bytes(digest, "little") % self.size
def add(self, word):
"""
Insert a word into the Bloom filter.
"""
for index in self._hashes(word):
self.bits[index] = 1
def contains(self, word):
"""
Return True if the word may exist in the filter.
False positives are possible, but false negatives
should not occur.
"""
for index in self._hashes(word):
if not self.bits[index]:
return False
return True
def load_words(filename):
"""
Load non-empty lines from a text file into a list.
"""
words = []
with open(filename, "r", encoding="ISO-8859-1", errors="ignore") as f:
for line in f:
word = line.strip()
if word:
words.append(word)
return words
# ====== MAIN ======
def main():
"""
Build a Bloom filter from a source wordlist and
evaluate it against a test dataset.
"""
start_time = time.time()
print("Loading source...")
source_words = load_words(source_file)
print("Loading test...")
test_words = load_words(test_file)
print("Building reference set...")
reference_set = set(source_words)
print("Building Bloom filter...")
bloom = BloomFilter(BIT_ARRAY_SIZE, NUM_HASHES)
for word in source_words:
bloom.add(word)
print("Testing dictionary...")
tp = tn = fp = fn = 0
for word in test_words:
bloom_result = bloom.contains(word)
actual_result = word in reference_set
if bloom_result and actual_result:
tp += 1
elif not bloom_result and not actual_result:
tn += 1
elif bloom_result and not actual_result:
fp += 1
elif not bloom_result and actual_result:
fn += 1
elapsed = time.time() - start_time
total = len(test_words)
fpr = fp / (fp + tn) if (fp + tn) > 0 else 0
print("\n===== RESULTS =====")
print(f"Bit array size: {BIT_ARRAY_SIZE}")
print(f"Hash functions: {NUM_HASHES}")
print(f"Rockyou size: {len(source_words)}")
print(f"Dictionary size: {total}")
print()
print(f"True Positive: {tp}")
print(f"True Negative: {tn}")
print(f"False Positive: {fp}")
print(f"False Negative: {fn}")
print()
print(f"False Positive Rate: {fpr:.6f}")
print(f"Runtime: {elapsed:.2f} seconds")
if __name__ == "__main__":
main()