【C++】哈希相关问题

发布于:2022-11-13 ⋅ 阅读:(903) ⋅ 点赞:(0)

目录

一、哈希概念

1、哈希基本概念

2、常见的三种哈希结构

3、哈希函数

1、直接定址法

2、除留余数法

4、字符串哈希算法

5、哈希冲突(哈希碰撞)

1、闭散列

2、开散列

三、unordered_set、unordered_set的模拟实现

1、unordered_set封装

2、unordered_map封装

3、一个类型K去做set和unordered_set它的模板参数有什么要求?

四、原地哈希

五、位图、布隆过滤器

1、背景

2、实现

3、位图的应用

五、布隆过滤器

1、布隆过滤器提出原因

2、布隆过滤器概念

3、布隆过滤器实现

4、布隆过滤器优点

5、布隆过滤器缺点

6、布隆过滤器应用

六、海量数据处理

1. 给两个文件,分别有100亿个query,我们只有1G内存,如何找到两个文件交集?分别给出精确算法和近似算法

2、给一个超过100G大小的log fifile, log中存着IP地址, 设计算法找到出现次数最多的IP地址?与上题条件相同,如何找到top K的IP?


一、哈希概念

1、哈希基本概念

顺序结构以及平衡树中,元素关键码与其存储位置之间没有对应的关系,因此在查找一个元素
时,必须要经过关键码的多次比较顺序查找时间复杂度为O(N),平衡树中为树的高度,即
O(lonN),搜索的效率取决于搜索过程中元素的比较次数。
理想的搜索方法:可以不经过任何比较,一次直接从表中得到要搜索的元素
如果构造一种存储结构,通过某种函数(hashFunc)使元素的存储位置与它的关键码之间能够建立
一一映射的关系,那么在查找时通过该函数可以很快找到该元素
该方法就是哈希。

哈希表(英文名字为Hash table,国内也有一些算法书籍翻译为散列表,大家看到这两个名称知道都是指hash table就可以了)。

哈希表是根据关键码的值而直接进行访问的数据结构。

这么这官方的解释可能有点懵,其实直白来讲其实数组就是一张哈希表。

哈希表中关键码就是数组的索引下标,然后通过下标直接访问数组中的元素,如下图所示:

那么哈希表能解决什么问题呢,一般哈希表都是用来快速判断一个元素是否出现集合里。

例如要查询一个名字是否在这所学校里。

要枚举的话时间复杂度是O(n),但如果使用哈希表的话, 只需要O(1)就可以做到。

我们只需要初始化把这所学校里学生的名字都存在哈希表里,在查询的时候通过索引直接就可以知道这位同学在不在这所学校里了。

将学生姓名映射到哈希表上就涉及到了hash function ,也就是哈希函数

 

2、常见的三种哈希结构

当我们想使用哈希来解决问题的时候,我们一般会选择如下三种数据结构

1、数组

2、set(集合)

3、map(映射)

set系列底层实现及优劣

集合 底层实现 是否有序 数值是否可以重复 能否更改数值 查询效率 增删效率
set 红黑树 有序 O(logN) O(logN)
multiset 红黑树 有序 O(logN) O(logN)
unordered_set 哈希表 无序 O(1) O(1)

map系列底层实现及优劣

集合 底层实现 是否有序 数值是否可以重复 能否更改数值 查询效率 增删效率

map

红黑树 key有序 key不可以重复 key不可以修改 O(logN) O(logN)
multimap 红黑树 key有序 key可以重复 key不可以修改 O(logN) O(logN)
unordered_map 哈希表 key无序 key不可以重复 key不可以修改 O(1) O(1)

3、哈希函数

1、直接定址法

取关键字的某个线性函数为散列地址Hash[key] = A * key + B

优点:简单,均匀

缺点:需要事先知道关键字分布情况

使用场景:适合查找比较小且连续的情况

例如:计数排序,查找字符串中只出现一次的字符

2、除留余数法

  设散列表中允许的地址数为m,取一个不大于m,但最接近或者等于m的质数p作为除数,按照哈希函数:Hash(key) = key% p(p<=m),将关键码转换成哈希地址  

存在哈希冲突,重点在于解决哈希冲突

4、字符串哈希算法

字符串并不像int和char类似是整型或者能够转换成整型,这时就需要我们的字符串哈希算法

//BKDR Hash Function  
//该算法由于在Brian Kernighan与Dennis Ritchie的《The C Programming Language》一书被展示而得 名,是一种简单快捷的hash算法,也是Java目前采用的字符串的Hash算法(累乘因子为31)。  
template<class T>  
size_t BKDRHash(const T *str)  
{  
    size_t hash = 0;  
    while (size_t ch = (size_t)*str++)  
    {         
        hash = hash * 131 + ch;   // 也可以乘以31、131、1313、13131、131313..           
    }  

    return hash;  
}  

//DJB Hash Function 2  
//由Daniel J. Bernstein 发明的另一种hash算法。  
template<class T>  
size_t DJB2Hash(const T *str)  
{  
    if(!*str)
    {  
        return 0;  
    }
    size_t hash = 5381;  
    while (size_t ch = (size_t)*str++)  
    {  
        hash = hash * 33 ^ ch;  
    }  

    return hash;  
}  

5、哈希冲突(哈希碰撞)

 

学生姓名按照一定的哈希函数映射到特定数据结构中,小李和小王都映射到了1,这就是哈希冲突又叫哈希碰撞

解决哈希冲突常用两种方法

闭散列,开散列

1、闭散列

闭散列:也叫开放定址法,当发生哈希冲突时,如果哈希表未被装满,说明在哈希表中必然还有空
位置,那么可以把key存放到冲突位置中的“下一个” 空位置中

找到空位置主要有两种方法,线性探测法,二次探测法    这里重点实现线性探测法

假如我们要插入11   hash(11) = 11 % 10 = 1

这时我们发现 下标为1的地方已经被占用了,那么11就会映射到从下标为1开始向后查找第一个没有被映射的位置

在数组中的删除并不是真的删除,我们可以标识这个位置数据无效

enum State
{
    EMPTY,
    EXIST,
    DELETE
};

这时我们就可以使用枚举类型

同时我们的线性探测法数据结构的主体是vector,我们在vector中不但要存储它的数据还要存储它的状态,我们可以使用一个pair

template<class K, class V>
struct HashData
{
    pair<K, V> _kv;
    State _state;
};

哈希表什么情况下进行扩容?如何扩容?

散列表的负载因子定义:负载因子 = 哈希表中元素 / 散列表的长度

负载因子到了一个基准值就扩容

基准值越大,冲突越多,效率越低,空间利用率越高

基准值越小,冲突越少,效率越高,空间利用率越低

线性探测法:在某个位置冲突很多的情况下,互相占用冲突一片(洪水效用)


对于哈希来说线性探测法并不是主要使用的方法,并且在现实中也并不常用,所以只实现了它的insert。

因为线性探测法它有致命的缺点,虽然它解决了哈希冲突,但是如果冲突的位置过多时,它的查找效率就会退化十分严重,极端情况查询效率退化为O(N)

#pragma once
#include <vector>

using namespace std;

enum State
{
    EMPTY,
    EXIST,
    DELETE
};

template<class K, class V>
struct HashData
{
    pair<K, V> _kv;
    State _state;
};

template<class K, class V>
class HashTable
{
public:
    bool insert(const pair<K, V>& kv)
    {
        if(_tables.size() == 0 || _tables.size() * 10 >= 7)
        {   size_t newSize = _tables.size() == 0 ? 10 : _tables.size() * 2;
            HashTable<K, V> newTables;
            newTables._tables.resize(newSize);
            for(const auto& e : _tables)
            {
                if(e._tate == EXIST)
                {
                    newTables.insert(e._kv);
                }
            }
            _tables.swap(newTables._tables);
        }

        size_t hashi = kv.first % _tables.size();
        while(_tables[hashi]._state == EXIST)
        {
            hashi++;
            hashi %= _tables.size();
        }
        _tables[hashi] = kv;
        _tables[hashi]._state = EXIST;
        _size++;
    }

private:
    vector<HashData<K, V>> _tables;
    size_t _size = 0;
};

闭散列线性探测法实现代码

2、开散列

开散列法又叫链地址法(开链法),首先对关键码集合用散列函数计算散列地址,具有相同地址的关
键码归于同一子集合,每一个子集合称为一个桶,各个桶中的元素通过一个单链表链接起来,各链
表的头结点存储在哈希表中。

 开散列直白的说就是vector内存储了链表的节点,使用每个节点存储在vector中哪一个下标来作为映射值

那么就不需要存储每个节点的状态了

插入时就寻找要映射的下标,寻找插入节点是否已经在,如果存在就直接返回

不存在就将节点头插入链表中

扩容时就不能像我们线性探测法一样直接创建一个新的哈希表,然后调用新哈希表的insert,最后交换两个哈希表的数据。

因为拉链法并不像线性探测法一样,使用数组作为哈希表

拉链法使用的是链表加数组,我们可以直接创建一个新的vector然后将原哈希表中vector的每一个节点插入到新的vector,最后交换两个vector的数据。

这里扩容也是有一定的学问的,我们不能直接粗暴的将哈希表扩容到原来的2倍,为了降低冲突的概率,我们每次扩容时都会扩容到原哈希表的二倍左右的素数

stl库中是定义了一个素数表来实现的

    size_t __stl_next_prime(size_t n)
	{
		static const size_t __stl_num_primes = 28;
		static const size_t __stl_prime_list[__stl_num_primes] =
		{
			53, 97, 193, 389, 769,
			1543, 3079, 6151, 12289, 24593,
			49157, 98317, 196613, 393241, 786433,
			1572869, 3145739, 6291469, 12582917, 25165843,
			50331653, 100663319, 201326611, 402653189, 805306457,
			1610612741, 3221225473, 4294967291
		};

		for (size_t i = 0; i < __stl_num_primes; ++i)
		{
			if (__stl_prime_list[i] > n)
			{
				return __stl_prime_list[i];
			}
		}

		return -1;
	}

这里insert使用了两个仿函数Hash和KeyOfT

因为某些数据类型是不支持取模操作的,所以需要我们使用仿函数,将这些数据类型转换为可以取模的类型,KeyOfT是仿照前面实现的map和set的实现,哈希表并不知道你要传入的数据类型

    pair<iterator, bool> Insert(const T& data)
    {
        Hash hash;
        KeyOfT kot;
        iterator ret = Find(kot(data));
        if(ret != end())
        {
            return make_pair(ret, false);
        }

        if(_size == _tables.size())
        {
            size_t newSize = __stl_next_prime(_size);
            vector<Node*> newTables;
            newTables.resize(newSize, nullptr);
            for(size_t i = 0; i < _tables.size(); i++)
            {
                Node* cur = _tables[i];
                while(cur)
                {
                    Node* next = cur->_next;
                    size_t hashi = hash(kot(cur->_data)) % newSize;
                    cur->_next = newTables[hashi];
                    newTables[hashi] = cur;
                    cur = next;
                }
                _tables[i] = nullptr;
            }
            _tables.swap(newTables);
        }

        size_t hashi = hash(kot(data)) % _tables.size();
        Node* newNode = new Node(data);
        newNode->_next = _tables[hashi];
        _tables[hashi] = newNode;
        _size++;
        return make_pair(iterator(newNode, this), true);
    }

删除时也是同样道理,寻找删除节点是否存在,如果存在,就去映射下标所对应的链表中寻找该节点,然后就是熟悉的操作了,删除链表的节点

   bool Erase(const K& key)
    {
        if(_tables.size() == 0 || _size == 0)
        {
            return false;
        }
        Hash hash;
        KeyOfT kot;
        size_t hashi = hash(key) % _tables.size();
        Node* cur = _tables[hashi];
        Node* prev = nullptr;
        while(cur)
        {
            if(kot(cur->_data) == key)
            {
                if(prev == nullptr)
                {
                    _tables[hashi] = cur->_next;
                }
                else
                {
                    prev->_next = cur->_next;
                }
                delete cur;
                _size--;
                return true;
            }
            prev = cur;
            cur = cur->_next;
        }
        return false;
    }

find较为简单就直接给出了代码

    bool Find(const K& key)
    {
        if(_tables.size() == 0)
        {
            return false;
        }
        Hash hash;
        KeyOfT kot;
        size_t hashi = hash(key) % _tables.size();
        Node* cur = _tables[hashi];
        while(cur)
        {
            if(kot(cur->_data) == key)
            {
                return true;
            }
            cur = cur->_next;
        }
        return false;
    }

析构函数:这里需要我们手动显式写出

因为vecotr调用析构函数,并不能销毁我们的链表

    ~HashTable()
	{
		for (size_t i = 0; i < _tables.size(); ++i)
		{
			Node* cur = _tables[i];
			while (cur)
			{
				Node* next = cur->_next;
				delete cur;
				cur = next;
			}
			_tables[i] = nullptr;
		}
	}

三、unordered_set、unordered_set的模拟实现

unordered_set和unordered_map主要依赖于哈希表的insert和erase以及迭代器

与set和map类似

首先要实现迭代器,通过查看stl源码可以知道,HashTable的迭代器中不但有节点的指针,还有,哈希表的指针,这是为了方便++时,跳转到下一个哈希桶

它的迭代器主要有operator++   operator*  operator->  operator== operator!=这几个运算符重载,

它没有--,因为它是单向迭代器

template<class K, class T, class Hash, class KeyOfT>
class HashTable;

template<class K, class T, class Hash, class KeyOfT>
struct __HashIterator
{
    typedef HashNode<T> Node;
    typedef HashTable<K, T, Hash, KeyOfT> HT;
    typedef __HashIterator<K, T, Hash, KeyOfT> self;

    Node* _node;
    HT* _pht;

    __HashIterator(Node* node, HT* pht)
        :_node(node)
        ,_pht(pht)
    {}

    T& operator*()
    {
        return _node->_data;
    }

    T* operator->()
    {
        return &_node->_data;
    }

    self& operator++()
    {
        if(_node->_next)
        {
            _node = _node->_next;
        }
        else
        {
            Hash hash;
            KeyOfT kot;
            size_t i = hash(kot(_node->_data)) % _pht->_tables.size();
            i++;
            for(; i < _pht->_tables.size(); i++)
            {
                if(_pht->_tables[i])
                {
                    _node = _pht->_tables[i];
                    break;
                }
            }

            if(i == _pht->_tables.size())
            {
                _node = nullptr;
            }
        }
        return *this;
    }

    bool operator!=(const self& s) const
    {
        return _node != s._node;
    }

    bool operator==(const self& s) const
    {
        return _node == s._node;
    }
};

前面有哈希表的声明,因为迭代器需要哈希表,并且哈希表也需要迭代器

哈希表迭代器版本代码,因为与前面代码相比,就增加了迭代器,所以直接给出代码


template<class K, class T, class Hash, class KeyOfT>
class HashTable
{   
    typedef HashNode<T> Node;

    template<class K, class T, class Hash, class KeyOfT>
	friend struct __HashIterator;
public:
 
    typedef __HashIterator<K, T, Hash, KeyOfT> iterator;

    ~HashTable()
	{
		for (size_t i = 0; i < _tables.size(); ++i)
		{
			Node* cur = _tables[i];
			while (cur)
			{
				Node* next = cur->_next;
				delete cur;
				cur = next;
			}
			_tables[i] = nullptr;
		}
	}

    iterator begin()
    {
        for(size_t i = 0; i < _tables.size(); i++)
        {
            if(_tables[i])
            {
                return iterator(_tables[i], this);
            }
        }
        return iterator(nullptr, this);
    }

    iterator end()
    {
        return iterator(nullptr, this);
    }

    size_t __stl_next_prime(size_t n)
	{
		static const size_t __stl_num_primes = 28;
		static const size_t __stl_prime_list[__stl_num_primes] =
		{
			53, 97, 193, 389, 769,
			1543, 3079, 6151, 12289, 24593,
			49157, 98317, 196613, 393241, 786433,
			1572869, 3145739, 6291469, 12582917, 25165843,
			50331653, 100663319, 201326611, 402653189, 805306457,				1610612741, 3221225473, 4294967291
		};

		for (size_t i = 0; i < __stl_num_primes; ++i)
		{
			if (__stl_prime_list[i] > n)
			{
				return __stl_prime_list[i];
			}
		}

		return -1;
	}

    pair<iterator, bool> Insert(const T& data)
    {
        Hash hash;
        KeyOfT kot;
        iterator ret = Find(kot(data));
        if(ret != end())
        {
            return make_pair(ret, false);
        }

        if(_size == _tables.size())
        {
            size_t newSize = __stl_next_prime(_size);
            vector<Node*> newTables;
            newTables.resize(newSize, nullptr);
            for(size_t i = 0; i < _tables.size(); i++)
            {
                Node* cur = _tables[i];
                while(cur)
                {
                    Node* next = cur->_next;
                    size_t hashi = hash(kot(cur->_data)) % newSize;
                    cur->_next = newTables[hashi];
                    newTables[hashi] = cur;
                    cur = next;
                }
                _tables[i] = nullptr;
            }
            _tables.swap(newTables);
        }

        size_t hashi = hash(kot(data)) % _tables.size();
        Node* newNode = new Node(data);
        newNode->_next = _tables[hashi];
        _tables[hashi] = newNode;
        _size++;
        return make_pair(iterator(newNode, this), true);
    }

    iterator Find(const K& key)
    {
        if(_tables.size() == 0)
        {
            return iterator(nullptr, this);
        }
        Hash hash;
        KeyOfT kot;
        size_t hashi = hash(key) % _tables.size();
        Node* cur = _tables[hashi];
        while(cur)
        {
            if(kot(cur->_data) == key)
            {
                return iterator(cur, this);
            }
            cur = cur->_next;
        }
        return iterator(nullptr, this);
    }

    bool Erase(const K& key)
    {
        if(_tables.size() == 0 || _size == 0)
        {
            return false;
        }
        Hash hash;
        KeyOfT kot;
        size_t hashi = hash(key) % _tables.size();
        Node* cur = _tables[hashi];
        Node* prev = nullptr;
        while(cur)
        {
            if(kot(cur->_data) == key)
            {
                if(prev == nullptr)
                {
                    _tables[hashi] = cur->_next;
                }
                else
                {
                    prev->_next = cur->_next;
                }
                delete cur;
                _size--;
                return true;
            }
            prev = cur;
            cur = cur->_next;
        }
        return false;
    }

    size_t Size() const
	{
		return _size;
	}

private:
    vector<Node*> _tables;
    size_t _size = 0;
};

1、unordered_set封装

unordered_set的封装与set类似,直接使用哈希表的成员函数


template<class K, class Hash = HashFunc<K>>
class unordered_set
{
    struct SetKeyOfT
	{
		const K& operator()(const K& key)
		{
			return key;
		}
	};
public:
    typedef typename HashTable<K, K, Hash, SetKeyOfT>::iterator iterator;
    //获取类的内嵌类型,加上typename

    iterator begin()
    {
        return _ht.begin();
    }

    iterator end()
    {
        return _ht.end();
    }

    pair<iterator, bool> insert(const K& key)
	{
		return _ht.Insert(key);
	}
private:
    HashTable<K, K, Hash, SetKeyOfT> _ht;
};

2、unordered_map封装


template<class K, class V, class Hash = HashFunc<K>>
class unordered_map
{
    struct MapKeyOfT
    {
        const K& operator()(const pair<K, V>& kv)
        {
            return kv.first;
        }
    };
public:
    typedef typename HashTable<K, pair<K, V>, Hash, MapKeyOfT>::iterator iterator;

    iterator begin()
    {
        return _ht.begin();
    }

    iterator end()
    {
        return _ht.end();
    }

    pair<iterator, bool> insert(const pair<K, V>& kv)
    {
        return _ht.Insert(kv);
    }

    V& operator[](const K& key)
	{
		pair<iterator, bool> ret = _ht.Insert(make_pair(key, V()));
		return ret.first->second;
	}
private:
    HashTable<K, pair<K, V>, Hash, MapKeyOfT> _ht;
};

3、一个类型K去做set和unordered_set它的模板参数有什么要求?

set:要求支持小于比较,或者显式提供比较的仿函数

unordered_set:a、K类型对象可以转换整型取模或者提供转换整型的仿函数

                           b、K类型对象可以支持等于比较或者提供等于比较的仿函数  

四、原地哈希

题目的描述是这样的:给你一个未排序的整数数组nums,请你找出其中没有出现的最小正整数

请你实现时间复杂度位O(N),并且只使用常数级别额外空间的解决方案


一句话题解

本题的难度在于只能使用常数级别的额外空间,在这个限制下本题的思路有一个非正式的名称:原地哈希。

由于题目要求我们只能使用常数级别空间,而要寻找的数字一定在{1, N + 1}左闭右开这个区间里面。因此我们可以把原始数组当成哈希表来使用

我们要找的数字在{1, N + 1}里面,最后N + 1这个元素可以不用寻找,因为我们一定是在前面N个元素找不到的情况下才会去寻找N + 1这个元素

我们可以采取这样的思路:把1这个数放到下标为0的位置,2放到下标为1的位置,按照这个思路将数组整理一遍,然后再遍历一遍数组,第一个遇到的它的值不等于下标+1的那个数就是我们要寻找的数字

class Solution {
public:
    int firstMissingPositive(vector<int>& nums) {
        int n = nums.size();
        for(int i = 0; i < n; i++)
        {
            while(nums[i] > 0 && nums[i] <= n && nums[nums[i] - 1] != nums[i])
            {
                swap(nums[nums[i] - 1], nums[i]);
            }
        }
        for(size_t i = 0; i < nums.size(); i++)
        {
            if(nums[i] != i + 1)
            {
                return i + 1;
            }
        }
        return n + 1;
    }
};

五、位图、布隆过滤器

1、背景

问题:给40亿个不重复的无符号整数,没有排序,给一个无符号整数,如何快速判断一个数是否在这40亿个数中?

1、排序 + 二分查找

时间复杂度O(N * logN),看似高效其实并不然

因为这么多的数据,我们不可能将它们全部加载到内存中,只能使用外排序,例如归并排序

但是是非常的慢,因为是在磁盘中排序,磁盘比内存慢的多,其次在磁盘中使用二分查找并不容易支持

2、使用搜索树或者哈希表

这种方法也不太现实,搜索树不但要存储数据还要存储它的三叉链等其它属性,也并不可能将它全部存储进来

3、使用位图

数据是否在给定的整形数据中,结果是在或者不在,刚好是两种状态,那么可以使用一个二进制比
特位来代表数据是否存在的信息,如果二进制比特位为1,代表存在,为0代表不存在

位图是key模型,用来判断数据是否存在,并且数据只能是整型

位图的大小与你的数据范围有关,与你的数据多少无关

一个位图最多占用512MB,整型的范围是0 ~ 4 294 967 295

2、实现

因为C++并不支持按位开空间,所以我们使用一个vector或者数组

数组每个元素类型是char,一个char在32位系统下是8个比特位,我们可以使用一个非类型模板参数模8 + 1来算出数组的长度

假如插入一个数字x,我们先让x / 8 算出它在哪一个char,然后 x % 8算出它在这个char的第几个位

最后将这个位搞成0

size_t i = x / 8;

size_t j = x % 8;

我们让1左移j位就找到了对应的位,然后 | 使这位置为1,而不影响其它位

删除就是将这个位置为0

还是找到对应的位,不同的是 我们要 & ~(1 << j)

查找是看这一位是否为1  &(1 << j)

template<size_t N>
    class BitSet
    {
    public:
        BitSet()
        {
            _bits.resize(N / 8 + 1);
        }

        void set(size_t x)
        {
            size_t i = x / 8;
            size_t j = x % 8;
            _bits[i] |= (1 << j);
        }

        void reset(size_t x)
        {
            size_t i = x / 8;
            size_t j = x % 8;
            _bits[i] &= ~(1 << j);
        }

        bool test(size_t x)
        {
            size_t i = x / 8;
            size_t j = x % 8;
            return _bits[i] & (1 << j);
        }
    private:
        vector<char> _bits;
        size_t _ratio = 5;
    };

 

C++库中是有这个数据结构的,但是这个数据结构有一点问题

它使用的是一个静态数组来实现的位图,处理的数据量太大时可能会出现栈溢出

我们上面的版本就不会出现这种情况,我们使用的是vector,vector是在堆区开空间

3、位图的应用

1、快速查找某个数据是否在一个集合中

2、排序 + 去重

3、求两个集合的交集,并集

4、操作系统中磁盘块标记

五、布隆过滤器

1、布隆过滤器提出原因

我们在使用新闻客户端看新闻时,它会给我们不停地推荐新的内容,它每次推荐时要去重,去掉那些已经看过的内容。问题来了,新闻客户端推荐系统如何实现推送去重的? 用服务器记录了用户看过的所有历史记录,当推荐系统推荐新闻时会从每个用户的历史记录里进行筛选,过滤掉那些已经存在的记录。 如何快速查找呢?
1. 用哈希表存储用户记录,缺点:浪费空间
2. 用位图存储用户记录,缺点:位图一般只能处理整形,如果内容编号是字符串,就无法处理 了。
3. 将哈希与位图结合,即布隆过滤器

2、布隆过滤器概念

布隆过滤器(英语:Bloom Filter)是 1970 年由布隆提出的。它实际上是一个很长的二进制向量和一系列随机映射函数。主要用于判断一个元素是否在一个集合中。

通常我们会遇到很多要判断一个元素是否在某个集合中的业务场景,一般想到的是将集合中所有元素保存起来,然后通过比较确定。链表、树、散列表(又叫哈希表,Hash table)等等数据结构都是这种思路。但是随着集合中元素的增加,我们需要的存储空间也会呈现线性增长,最终达到瓶颈。同时检索速度也越来越慢,上述三种结构的检索时间复杂度分别为O(n),O(logn),O(1)。

这个时候,布隆过滤器(Bloom Filter)就应运而生。

3、布隆过滤器实现

布隆过滤器是通过字符串哈希算法,将字符串转换成整型去映射一个位置进行标记

不过它会存在一定概率的误判,因为字符串是有无穷多个的,很可能某几个字符串就会冲突

对于字符串存在是有误判的,但是对于字符串不在是没有误判的,不在就是不在

为了降低误判率,我们可以让每个值通过不同的字符串哈希算法映射多个位

只有多个标记位都有被映射,它才可能存在

只要有一个位没有被映射,那么它就不存在

理论而言:一个值映射的位越多,误判的概率越低,但是也不能映射太多的位,映射位越多,空间消耗越多

很显然,过小的布隆过滤器很快所有的 bit 位均为 1,那么查询任何值都会返回“可能存在”,起不到过滤的目的了。布隆过滤器的长度会直接影响误报率,布隆过滤器越长其误报率越小。

另外,哈希函数的个数也需要权衡,个数越多则布隆过滤器 bit 位置位 1 的速度越快,且布隆过滤器的效率越低;但是如果太少的话,那我们的误报率会变高。

为了确定哈希函数与布隆过滤器长度及插入元素个数的关系这里贴一个公式

k为哈希函数的个数,m为布隆过滤器长度,n为插入元素的个数,p为误报率


传统的布隆过滤器是不支持删除的,支持可能又会导致其他问题

例如:造成误删,删除某个元素,可能会查不到另一个元素

如果要实现删除版本的布隆过滤器,就要使用多个位表示一个位置,作为计数器

但是为了支持删除,空间消耗就会增加,使布隆过滤器优势减弱


struct HashBKDR
{
	// BKDR
	size_t operator()(const string& key)
	{
		size_t val = 0;
		for (auto ch : key)
		{
			val *= 131;
			val += ch;
		}

		return val;
	}
};

struct HashAP
{
	// AP
	size_t operator()(const string& key)
	{
		size_t hash = 0;
		for (size_t i = 0; i < key.size(); i++)
		{
			if ((i & 1) == 0)
			{
				hash ^= ((hash << 7) ^ key[i] ^ (hash >> 3));
			}
			else
			{
				hash ^= (~((hash << 11) ^ key[i] ^ (hash >> 5)));
			}
		}
		return hash;
	}
};

struct HashDJB
{
	// DJB
	size_t operator()(const string& key)
	{
		size_t hash = 5381;
		for (auto ch : key)
		{
			hash += (hash << 5) + ch;
		}

		return hash;
	}
};

template<size_t N,
class K = string, class Hash1 = HashBKDR, class Hash2 = HashAP, class Hash3 = HashDJB>
class BloomFilter
{
public:
    void set(const K& key)
    {
        size_t hash1 = Hash1()(key) % (_ratio * N);
        size_t hash2 = Hash2()(key) % (_ratio * N);
        size_t hash3 = Hash3()(key) % (_ratio * N);
        _bis->set(hash1);
        _bis->set(hash2);
        _bis->set(hash3);
    }

    bool test(const K& key)
    {
        size_t hash1 = Hash1()(key) % (_ratio * N);
        if(!_bis->test(hash1))
        {
            return false;
        }

        size_t hash2 = Hash2()(key) % (_ratio * N);
        if(!_bis->test(hash2))
        {
            return false;
        }

        size_t hash3 = Hash3()(key) % (_ratio * N);
        if(!_bis->test(hash3))
        {
            return false;
        }

        return true;
    }
private:
    const static size_t _ratio = 5;
    bitset<_ratio * N>* _bis = new bitset<_ratio * N>;
};

这里使用了三个哈希函数来映射,可以根据实际需求自行添加哈希函数

 

4、布隆过滤器优点

1. 增加和查询元素的时间复杂度为 :O(K), (K 为哈希函数的个数,一般比较小 ) ,与数据量大小无关
2. 哈希函数相互之间没有关系,方便硬件并行运算
3. 布隆过滤器不需要存储元素本身,在某些对保密要求比较严格的场合有很大优势
4. 在能够承受一定的误判时,布隆过滤器比其他数据结构有这很大的空间优势
5. 数据量很大时,布隆过滤器可以表示全集,其他数据结构不能
6. 使用同一组散列函数的布隆过滤器可以进行交、并、差运算

5、布隆过滤器缺点

1. 有误判率,即存在假阳性 (False Position) ,即不能准确判断元素是否在集合中 ( 补救方法:再建立一个白名单,存储可能会误判的数据)
2. 不能获取元素本身
3. 一般情况下不能从布隆过滤器中删除元素
4. 如果采用计数方式删除,可能会存在计数回绕问题

6、布隆过滤器应用

想要拦截垃圾邮件或者是骚扰电话,我们如果直接去数据库中查黑名单,效率是比较低的

我们可以使用布隆过滤器,因为布隆过滤器判读元素不在是准确的,所以可以直接将不在黑名单的

过滤掉,剩下的就是可能是在黑名单中的,剩下的这部分在数据库中查询,就可以提高效率

另一种应用是在账号注册页面,我们使用布隆过滤器,快速输入提示,昵称是否被占用

这是可以存在误判的情况

六、海量数据处理

1. 给两个文件,分别有100亿个query,我们只有1G内存,如何找到两个文件交集?分别给出精确算法和近似算法

(1)近似算法:使用两个布隆过滤器分别存储两个文件,可以从头遍历布隆过滤器,只有两个布隆过滤器同时能够查到的元素,就是交集(前提,两个布隆过滤器使用相同的哈希算法)
或者将两个布隆过滤器的元素&,为1的就是公共元素
(2)精确算法:使用哈希分割
假设这两个文件分别是A,B,依次读取文件A,B中的query,使用哈希字符串算法 i = Hash(query) % 1000,让这个query分别进入Ai   Bi
相同的query一定会进入相同编号的小文件
因为:同一个query,使用相同的字符串哈希算法得到的i一定是相同的
然后将每个小文件放到内存中使用unordered_set或者set,编号相同的小文件找交集

2、给一个超过100G大小的log fifile, log中存着IP地址, 设计算法找到出现次数最多的IP地址?与上题条件相同,如何找到top K的IP?

同样使用哈希分割,分割成小文件,在每个小文件中依次使用map<string, int>对每个小文件统计次数,找到每个文件中出现次数最多的IP

topK问题,建立一个K个值为pair<ip, count>的小堆

本文含有隐藏内容,请 开通VIP 后查看