者:kylinkzhang,CSIG后台开发工程师
| 导语 一致性Hash算法是解决分布式缓存等问题的一种算法; 本文介绍了一致性Hash算法的原理,并给出了一种实现和实际运用的案例;
考虑这么一种场景:
我们有三台缓存服务器编号node0、node1、node2,现在有3000万个key,希望可以将这些个key均匀的缓存到三台机器上,你会想到什么方案呢?
我们可能首先想到的方案是:取模算法hash(key)% N,即:对key进行hash运算后取模,N是机器的数量;
这样,对key进行hash后的结果对3取模,得到的结果一定是0、1或者2,正好对应服务器node0、node1、node2,存取数据直接找对应的服务器即可,简单粗暴,完全可以解决上述的问题;
取模算法虽然使用简单,但对机器数量取模,在集群扩容和收缩时却有一定的局限性:因为在生产环境中根据业务量的大小,调整服务器数量是常有的事;
而服务器数量N发生变化后hash(key)% N计算的结果也会随之变化!
比如:一个服务器节点挂了,计算公式从hash(key)% 3变成了hash(key)% 2,结果会发生变化,此时想要访问一个key,这个key的缓存位置大概率会发生改变,那么之前缓存key的数据也会失去作用与意义;
大量缓存在同一时间失效,造成缓存的雪崩,进而导致整个缓存系统的不可用,这基本上是不能接受的;
为了解决优化上述情况,一致性hash算法应运而生~
一致性哈希算法在 1997 年由麻省理工学院提出,是一种特殊的哈希算法,在移除或者添加一个服务器时,能够尽可能小地改变已存在的服务请求与处理请求服务器之间的映射关系;
一致性哈希解决了简单哈希算法在分布式哈希表(Distributed Hash Table,DHT)中存在的动态伸缩等问题;
一致性hash算法本质上也是一种取模算法;
不过,不同于上边按服务器数量取模,一致性hash是对固定值2^32取模;
IPv4的地址是4组8位2进制数组成,所以用2^32可以保证每个IP地址会有唯一的映射;
我们可以将这2^32个值抽象成一个圆环??,圆环的正上方的点代表0,顺时针排列,以此类推:1、2、3…直到2^32-1,而这个由2的32次方个点组成的圆环统称为hash环;
在对服务器进行映射时,使用hash(服务器ip)% 2^32,即:
使用服务器IP地址进行hash计算,用哈希后的结果对2^32取模,结果一定是一个0到2^32-1之间的整数;
而这个整数映射在hash环上的位置代表了一个服务器,依次将node0、node1、node2三个缓存服务器映射到hash环上;
在对对应的Key映射到具体的服务器时,需要首先计算Key的Hash值:hash(key)% 2^32;
注:此处的Hash函数可以和之前计算服务器映射至Hash环的函数不同,只要保证取值范围和Hash环的范围相同即可(即:2^32);
将Key映射至服务器遵循下面的逻辑:
从缓存对象key的位置开始,沿顺时针方向遇到的第一个服务器,便是当前对象将要缓存到的服务器;
假设我们有 "semlinker"、"kakuqo"、"lolo"、"fer" 四个对象,分别简写为 o1、o2、o3 和 o4;
首先,使用哈希函数计算这个对象的 hash 值,值的范围是 [0, 2^32-1]:
图中对象的映射关系如下:
hash(o1)=k1; hash(o2)=k2;
hash(o3)=k3; hash(o4)=k4;
同时 3 台缓存服务器,分别为 CS1、CS2 和 CS3:
则可知,各对象和服务器的映射关系如下:
K1=> CS1
K4=> CS3
K2=> CS2
K3=> CS1
即:
以上便是一致性Hash的工作原理;
可以看到,一致性Hash就是:将原本单个点的Hash映射,转变为了在一个环上的某个片段上的映射!
下面我们来看几种服务器扩缩容的场景;
假设 CS3 服务器出现故障导致服务下线,这时原本存储于 CS3 服务器的对象 o4,需要被重新分配至 CS2 服务器,其它对象仍存储在原有的机器上:
此时受影响的数据只有 CS2 和 CS3 服务器之间的部分数据!
假如业务量激增,我们需要增加一台服务器 CS4,经过同样的 hash 运算,该服务器最终落于 t1 和 t2 服务器之间,具体如下图所示:
此时,只有 t1 和 t2 服务器之间的部分对象需要重新分配;
在以上示例中只有 o3 对象需要重新分配,即它被重新到 CS4 服务器;
在前面我们已经说过:如果使用简单的取模方法,当新添加服务器时可能会导致大部分缓存失效,而使用一致性哈希算法后,这种情况得到了较大的改善,因为只有少部分对象需要重新分配!
在上面给出的例子中,各个服务器几乎是平均被均摊到Hash环上;
但是在实际场景中很难选取到一个Hash函数这么完美的将各个服务器散列到Hash环上;
此时,在服务器节点数量太少的情况下,很容易因为节点分布不均匀而造成数据倾斜问题;
如下图被缓存的对象大部分缓存在node-4服务器上,导致其他节点资源浪费,系统压力大部分集中在node-4节点上,这样的集群是非常不健康的:
同时,还有另一个问题:
在上面新增服务器 CS4 时,CS4 只分担了 CS1 服务器的负载,服务器 CS2 和 CS3 并没有因为 CS4 服务器的加入而减少负载压力;如果 CS4 服务器的性能与原有服务器的性能一致甚至可能更高,那么这种结果并不是我们所期望的;
针对上面的问题,我们可以通过:引入虚拟节点来解决负载不均衡的问题:
即将每台物理服务器虚拟为一组虚拟服务器,将虚拟服务器放置到哈希环上,如果要确定对象的服务器,需先确定对象的虚拟服务器,再由虚拟服务器确定物理服务器;
如下图所示:
在图中:o1 和 o2 表示对象,v1 ~ v6 表示虚拟服务器,s1 ~ s3 表示实际的物理服务器;
虚拟节点的hash计算通常可以采用:对应节点的IP地址加数字编号后缀 hash(10.24.23.227#1) 的方式;
举个例子,node-1节点IP为10.24.23.227,正常计算node-1的hash值:
假设我们给node-1设置三个虚拟节点,node-1#1、node-1#2、node-1#3,对它们进行hash后取模:
注意:
分配的虚拟节点个数越多,映射在hash环上才会越趋于均匀,节点太少的话很难看出效果;
引入虚拟节点的同时也增加了新的问题,要做虚拟节点和真实节点间的映射,对象key->虚拟节点->实际节点之间的转换;
一致性hash在分布式系统中应该是实现负载均衡的首选算法,它的实现比较灵活,既可以在客户端实现,也可以在中间件上实现,比如日常使用较多的缓存中间件memcached和redis集群都有用到它;
memcached的集群比较特殊,严格来说它只能算是伪集群,因为它的服务器之间不能通信,请求的分发路由完全靠客户端来的计算出缓存对象应该落在哪个服务器上,而它的路由算法用的就是一致性hash;
还有redis集群中hash槽的概念,虽然实现不尽相同,但思想万变不离其宗,看完本篇的一致性hash,你再去理解redis槽位就轻松多了;
其它的应用场景还有很多:
下面我们根据上面的讲述,使用Golang实现一个一致性Hash算法,这个算法具有一些下面的功能特性:
具体源代码见:
https://github.com/JasonkayZK/consistent-hashing-demo
下面开始实现吧!
首先定义每一台缓存服务器的数据结构:
core/host.go
type Host struct {
// the host id: ip:port
Name string
// the load bound of the host
LoadBound int64
}
其中:
其次,定义一致性Hash的结构:
core/algorithm.go
// Consistent is an implementation of consistent-hashing-algorithm
type Consistent struct {
// the number of replicas
replicaNum int
// the total loads of all replicas
totalLoad int64
// the hash function for keys
hashFunc func(key string) uint64
// the map of virtual nodes to hosts
hostMap map[string]*Host
// the map of hashed virtual nodes to host name
replicaHostMap map[uint64]string
// the hash ring
sortedHostsHashSet []uint64
// the hash ring lock
sync.RWMutex
}
其中:
大概的结构如上所示,下面我们来看一些常量和错误的定义;
常量的定义如下:
core/algorithm.go
const (
// The format of the host replica name
hostReplicaFormat=`%s%d`
)
var (
// the default number of replicas
defaultReplicaNum=10
// the load bound factor
// ref: https://research.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.html
loadBoundFactor=0.25
// the default Hash function for keys
defaultHashFunc=func(key string) uint64 {
out :=sha512.Sum512([]byte(key))
return binary.LittleEndian.Uint64(out[:])
}
)
分别表示:
还有一些错误的定义:
core/error.go
var (
ErrHostAlreadyExists=errors.New("host already exists")
ErrHostNotFound=errors.New("host not found")
)
分别表示服务器已经注册,以及缓存服务器未找到;
下面来看具体的方法实现!
注册缓存服务器的代码如下:
core/algorithm.go
func (c *Consistent) RegisterHost(hostName string) error {
c.Lock()
defer c.Unlock()
if _, ok :=c.hostMap[hostName]; ok {
return ErrHostAlreadyExists
}
c.hostMap[hostName]=&Host{
Name: hostName,
LoadBound: 0,
}
for i :=0; i < c.replicaNum; i++ {
hashedIdx :=c.hashFunc(fmt.Sprintf(hostReplicaFormat, hostName, i))
c.replicaHostMap[hashedIdx]=hostName
c.sortedHostsHashSet=append(c.sortedHostsHashSet, hashedIdx)
}
// sort hashes in ascending order
sort.Slice(c.sortedHostsHashSet, func(i int, j int) bool {
if c.sortedHostsHashSet[i] < c.sortedHostsHashSet[j] {
return true
}
return false
})
return nil
}
代码比较简单,简单说一下;
首先,检查服务器是否已经注册,如果已经注册,则直接返回已经注册的错误;
随后,创建一个Host对象,并且在 for 循环中创建多个虚拟节点:
最后,对Hash环进行排序;
这里使用数组作为Hash环只是为了便于说明,在实际实现中建议选用其他数据结构进行实现,以获取更好的性能;
当缓存服务器信息写入 replicaHostMap 映射以及 Hash 环后,即完成了缓存服务器的注册;
注销缓存服务器的代码如下:
core/algorithm.go
func (c *Consistent) UnregisterHost(hostName string) error {
c.Lock()
defer c.Unlock()
if _, ok :=c.hostMap[hostName]; !ok {
return ErrHostNotFound
}
delete(c.hostMap, hostName)
for i :=0; i < c.replicaNum; i++ {
hashedIdx :=c.hashFunc(fmt.Sprintf(hostReplicaFormat, hostName, i))
delete(c.replicaHostMap, hashedIdx)
c.delHashIndex(hashedIdx)
}
return nil
}
// Remove hashed host index from the hash ring
func (c *Consistent) delHashIndex(val uint64) {
idx :=-1
l :=0
r :=len(c.sortedHostsHashSet) - 1
for l <=r {
m :=(l + r) / 2
if c.sortedHostsHashSet[m]==val {
idx=m
break
} else if c.sortedHostsHashSet[m] < val {
l=m + 1
} else if c.sortedHostsHashSet[m] > val {
r=m - 1
}
}
if idx !=-1 {
c.sortedHostsHashSet=append(c.sortedHostsHashSet[:idx], c.sortedHostsHashSet[idx+1:]...)
}
}
和注册缓存服务器相反,将服务器在 Map 映射以及 Hash 环中去除即完成了注销;
这里的逻辑和上面注册的逻辑极为类似,这里不再赘述!
查询 Key 是整个一致性 Hash 算法的核心,但是实现起来也并不复杂;
代码如下:
core/algorithm.go
func (c *Consistent) GetKey(key string) (string, error) {
hashedKey :=c.hashFunc(key)
idx :=c.searchKey(hashedKey)
return c.replicaHostMap[c.sortedHostsHashSet[idx]], nil
}
func (c *Consistent) searchKey(key uint64) int {
idx :=sort.Search(len(c.sortedHostsHashSet), func(i int) bool {
return c.sortedHostsHashSet[i] >=key
})
if idx >=len(c.sortedHostsHashSet) {
// make search as a ring
idx=0
}
return idx
}
代码首先计算 key 的散列值;
随后,在Hash环上“顺时针”寻找可以缓存的第一台缓存服务器:
idx :=sort.Search(len(c.sortedHostsHashSet), func(i int) bool {
return c.sortedHostsHashSet[i] >=key
})
注意到,如果 key 比当前Hash环中最大的虚拟节点的 hash 值还大,则选择当前 Hash环 中 hash 值最小的一个节点(即“环形”的逻辑):
if idx >=len(c.sortedHostsHashSet) {
// make search as a ring
idx=0
}
searchKey 返回了虚拟节点在 Hash 环数组中的 index;
随后,我们使用 map 返回 index 对应的缓存服务器的名称即可;
至此,一致性 Hash 算法基本实现,接下来我们来验证一下;
在验证算法之前,我们还需要准备几台缓存服务器;
为了简单起见,这里使用了 HTTP 服务器作为缓存服务器,具体代码如下所示:
server/main.go
package main
import (
"flag"
"fmt"
"net/http"
"sync"
"time"
)
type CachedMap struct {
KvMap sync.Map
Lock sync.RWMutex
}
var (
cache=CachedMap{KvMap: sync.Map{}}
port=flag.String("p", "8080", "port")
regHost="http://localhost:18888"
expireTime=10
)
func main() {
flag.Parse()
stopChan :=make(chan interface{})
startServer(*port)
<-stopChan
}
func startServer(port string) {
hostName :=fmt.Sprintf("localhost:%s", port)
fmt.Printf("start server: %s\n", port)
err :=registerHost(hostName)
if err !=nil {
panic(err)
}
http.HandleFunc("/", kvHandle)
err=http.ListenAndServe(":"+port, nil)
if err !=nil {
err=unregisterHost(hostName)
if err !=nil {
panic(err)
}
panic(err)
}
}
func kvHandle(w http.ResponseWriter, r *http.Request) {
_=r.ParseForm()
if _, ok :=cache.KvMap.Load(r.Form["key"][0]); !ok {
val :=fmt.Sprintf("hello: %s", r.Form["key"][0])
cache.KvMap.Store(r.Form["key"][0], val)
fmt.Printf("cached key: {%s: %s}\n", r.Form["key"][0], val)
time.AfterFunc(time.Duration(expireTime)*time.Second, func() {
cache.KvMap.Delete(r.Form["key"][0])
fmt.Printf("removed cached key after 3s: {%s: %s}\n", r.Form["key"][0], val)
})
}
val, _ :=cache.KvMap.Load(r.Form["key"][0])
_, err :=fmt.Fprintf(w, val.(string))
if err !=nil {
panic(err)
}
}
func registerHost(host string) error {
resp, err :=http.Get(fmt.Sprintf("%s/register?host=%s", regHost, host))
if err !=nil {
return err
}
defer resp.Body.Close()
return nil
}
func unregisterHost(host string) error {
resp, err :=http.Get(fmt.Sprintf("%s/unregister?host=%s", regHost, host))
if err !=nil {
return err
}
defer resp.Body.Close()
return nil
}
代码接受由命令行指定的 -p 参数指定服务器端口号;
代码执行后,会调用 startServer 函数启动一个http服务器;
在 startServer 函数中,首先调用 registerHost 在代理服务器上进行注册(下文会讲),并监听 /路径,具体代码如下:
func startServer(port string) {
hostName :=fmt.Sprintf("localhost:%s", port)
fmt.Printf("start server: %s\n", port)
err :=registerHost(hostName)
if err !=nil {
panic(err)
}
http.HandleFunc("/", kvHandle)
err=http.ListenAndServe(":"+port, nil)
if err !=nil {
err=unregisterHost(hostName)
if err !=nil {
panic(err)
}
panic(err)
}
}
kvHandle 函数对请求进行处理:
func kvHandle(w http.ResponseWriter, r *http.Request) {
_=r.ParseForm()
if _, ok :=cache.KvMap.Load(r.Form["key"][0]); !ok {
val :=fmt.Sprintf("hello: %s", r.Form["key"][0])
cache.KvMap.Store(r.Form["key"][0], val)
fmt.Printf("cached key: {%s: %s}\n", r.Form["key"][0], val)
time.AfterFunc(time.Duration(expireTime)*time.Second, func() {
cache.KvMap.Delete(r.Form["key"][0])
fmt.Printf("removed cached key after 3s: {%s: %s}\n", r.Form["key"][0], val)
})
}
val, _ :=cache.KvMap.Load(r.Form["key"][0])
_, err :=fmt.Fprintf(w, val.(string))
if err !=nil {
panic(err)
}
}
首先,解析来自路径的参数:?key=xxx;
随后,查询服务器中的缓存(为了简单起见,这里使用 sync.Map 来模拟缓存):
最后,返回缓存;
有了缓存服务器之后,我们还需要一个代理服务器来选择具体选择哪个缓存服务器来请求;
代码如下:
proxy/proxy.go
package proxy
import (
"fmt"
"github.com/jasonkayzk/consistent-hashing-demo/core"
"io/ioutil"
"net/http"
"time"
)
type Proxy struct {
consistent *core.Consistent
}
// NewProxy creates a new Proxy
func NewProxy(consistent *core.Consistent) *Proxy {
proxy :=&Proxy{
consistent: consistent,
}
return proxy
}
func (p *Proxy) GetKey(key string) (string, error) {
host, err :=p.consistent.GetKey(key)
if err !=nil {
return "", err
}
resp, err :=http.Get(fmt.Sprintf("http://%s?key=%s", host, key))
if err !=nil {
return "", err
}
defer resp.Body.Close()
body, _ :=ioutil.ReadAll(resp.Body)
fmt.Printf("Response from host %s: %s\n", host, string(body))
return string(body), nil
}
func (p *Proxy) RegisterHost(host string) error {
err :=p.consistent.RegisterHost(host)
if err !=nil {
return err
}
fmt.Println(fmt.Sprintf("register host: %s success", host))
return nil
}
func (p *Proxy) UnregisterHost(host string) error {
err :=p.consistent.UnregisterHost(host)
if err !=nil {
return err
}
fmt.Println(fmt.Sprintf("unregister host: %s success", host))
return nil
}
代理服务器的逻辑很简单,就是创建一个一致性Hash结构: Consistent,把 Consistent 和请求缓存服务器的逻辑进行了一层封装;
启动代理服务器的代码如下:
package main
import (
"fmt"
"github.com/jasonkayzk/consistent-hashing-demo/core"
"github.com/jasonkayzk/consistent-hashing-demo/proxy"
"net/http"
)
var (
port="18888"
p=proxy.NewProxy(core.NewConsistent(10, nil))
)
func main() {
stopChan :=make(chan interface{})
startServer(port)
<-stopChan
}
func startServer(port string) {
http.HandleFunc("/register", registerHost)
http.HandleFunc("/unregister", unregisterHost)
http.HandleFunc("/key", getKey)
fmt.Printf("start proxy server: %s\n", port)
err :=http.ListenAndServe(":"+port, nil)
if err !=nil {
panic(err)
}
}
func registerHost(w http.ResponseWriter, r *http.Request) {
_=r.ParseForm()
err :=p.RegisterHost(r.Form["host"][0])
if err !=nil {
w.WriteHeader(http.StatusInternalServerError)
_, _=fmt.Fprintf(w, err.Error())
return
}
_, _=fmt.Fprintf(w, fmt.Sprintf("register host: %s success", r.Form["host"][0]))
}
func unregisterHost(w http.ResponseWriter, r *http.Request) {
_=r.ParseForm()
err :=p.UnregisterHost(r.Form["host"][0])
if err !=nil {
w.WriteHeader(http.StatusInternalServerError)
_, _=fmt.Fprintf(w, err.Error())
return
}
_, _=fmt.Fprintf(w, fmt.Sprintf("unregister host: %s success", r.Form["host"][0]))
}
func getKey(w http.ResponseWriter, r *http.Request) {
_=r.ParseForm()
val, err :=p.GetKey(r.Form["key"][0])
if err !=nil {
w.WriteHeader(http.StatusInternalServerError)
_, _=fmt.Fprintf(w, err.Error())
return
}
_, _=fmt.Fprintf(w, fmt.Sprintf("key: %s, val: %s", r.Form["key"][0], val))
}
和缓存服务器类似,这里采用 HTTP 服务器来模拟;
代理服务器监听 18888 端口的几个路由:
这里为了简单起见,使用了这种方式进行服务注册,实际使用时请使用其他组件进行实现!
接下来启动缓存服务器:
start proxy server: 18888
分别启动三个缓存服务器:
$ go run server/main.go -p 8080
start server: 8080
$ go run server/main.go -p 8081
start server: 8081
$ go run server/main.go -p 8082
start server: 8082
同时,代理服务器输出:
register host: localhost:8080 success
register host: localhost:8081 success
register host: localhost:8082 success
可以看到缓存服务器已经成功注册;
可以使用 curl 命令请求代理服务器获取缓存 key:
$ curl localhost:18888/key?key=123
key: 123, val: hello: 123
此时,代理服务器输出:
Response from host localhost:8080: hello: 123
同时,8000端口的缓存服务器输出:
cached key: {123: hello: 123}
removed cached key after 10s: {123: hello: 123}
可以看到,8000端口的服务器对key值进行了缓存,并在10秒后清除了缓存;
尝试获取多个Key:
Response from host localhost:8082: hello: 45363456
Response from host localhost:8080: hello: 4
Response from host localhost:8082: hello: 1
Response from host localhost:8080: hello: 2
Response from host localhost:8082: hello: 3
Response from host localhost:8080: hello: 4
Response from host localhost:8082: hello: 5
Response from host localhost:8080: hello: 6
Response from host localhost:8082: hello: sdkbnfoerwtnbre
Response from host localhost:8082: hello: sd45555254tg423i5gvj4v5
Response from host localhost:8081: hello: 0
Response from host localhost:8082: hello: 032452345
可以看到不同的key被散列到了不同的缓存服务器;
接下来我们通过debug查看具体的变量来一探究竟;
开启debug,并注册单个缓存服务器后,查看 Consistent 中的值:
注册三个缓存服务器后,查看 Consistent 中的值:
从debug中的变量,我们就可以很清楚的看到注册不同数量的服务器时,一致性Hash上服务器的动态变化!
以上就是基本的一致性Hash算法的实现了!
但是很多时候,我们的缓存服务器需要同时处理大量的缓存请求,而通过上面的算法,我们总是会去同一台缓存服务器去获取缓存数据;
如果很多的热点数据都落在了同一台缓存服务器上,则可能会出现性能瓶颈;
Google 在2017年提出了: 含有负载边界值的一致性Hash算法;
下面我们在基本的一致性Hash算法的基础上,实现含有负载边界值的一致性Hash!
17年时,Google 提出了含有负载边界值的一致性Hash算法,此算法主要应用于在实现一致性的同时,实现负载的平均性;
此算法最初由 Vimeo 的 Andrew Rodland 在 haproxy 中实现并开源;
参考:
https://ai.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.html
arvix论文地址:
https://arxiv.org/abs/1608.01350
这个算法将缓存服务器视为一个含有一定容量的桶(可以简单理解为Hash桶),将客户端视为球,则平均性目标表示为:所有约等于平均密度(球的数量除以桶的数量):
实际使用时,可以设定一个平均密度的参数 ε,将每个桶的容量设置为平均加载时间的 下上限 (1+ε);
具体的计算过程如下:
例如下面的图:
使用哈希函数将 6 个球和 3 个桶分配给 Hash环 上的随机位置,假设每个桶的容量设置为 2,按 ID 值的递增顺序分配球;
在上面基本一致性 Hash 算法实现的基础上,我们继续实现含有负载边界值的一致性Hash算法;
在核心算法中添加根据负载情况查询Key的函数,以及增加/释放负载值的函数;
根据负载情况查询 Key 的函数:
core/algorithm.go
func (c *Consistent) GetKeyLeast(key string) (string, error) {
c.RLock()
defer c.RUnlock()
if len(c.replicaHostMap)==0 {
return "", ErrHostNotFound
}
hashedKey :=c.hashFunc(key)
idx :=c.searchKey(hashedKey) // Find the first host that may serve the key
i :=idx
for {
host :=c.replicaHostMap[c.sortedHostsHashSet[i]]
loadChecked, err :=c.checkLoadCapacity(host)
if err !=nil {
return "", err
}
if loadChecked {
return host, nil
}
i++
// if idx goes to the end of the ring, start from the beginning
if i >=len(c.replicaHostMap) {
i=0
}
}
}
func (c *Consistent) checkLoadCapacity(host string) (bool, error) {
// a safety check if someone performed c.Done more than needed
if c.totalLoad < 0 {
c.totalLoad=0
}
var avgLoadPerNode float64
avgLoadPerNode=float64((c.totalLoad + 1) / int64(len(c.hostMap)))
if avgLoadPerNode==0 {
avgLoadPerNode=1
}
avgLoadPerNode=math.Ceil(avgLoadPerNode * (1 + loadBoundFactor))
candidateHost, ok :=c.hostMap[host]
if !ok {
return false, ErrHostNotFound
}
if float64(candidateHost.LoadBound)+1 <=avgLoadPerNode {
return true, nil
}
return false, nil
}
在 GetKeyLeast 函数中,首先根据 searchKey 函数,顺时针获取可能满足条件的第一个虚拟节点;
随后调用 checkLoadCapacity 校验当前缓存服务器的负载数是否满足条件:
如果不满足条件,则沿着 Hash 环走到下一个虚拟节点,继续判断是否满足条件,直到满足条件;
这里使用的是无条件的 for 循环,因为一定存在低于 平均负载(1 + loadBoundFactor) 的虚拟节点!*
增加/释放负载值的函数:
core/algorithm.go
func (c *Consistent) Inc(hostName string) {
c.Lock()
defer c.Unlock()
atomic.AddInt64(&c.hostMap[hostName].LoadBound, 1)
atomic.AddInt64(&c.totalLoad, 1)
}
func (c *Consistent) Done(host string) {
c.Lock()
defer c.Unlock()
if _, ok :=c.hostMap[host]; !ok {
return
}
atomic.AddInt64(&c.hostMap[host].LoadBound, -1)
atomic.AddInt64(&c.totalLoad, -1)
}
逻辑比较简单,就是原子的对对应缓存服务器进行负载加减一操作;
在代理服务器中增加路由:
proxy/proxy.go
func (p *Proxy) GetKeyLeast(key string) (string, error) {
host, err :=p.consistent.GetKeyLeast(key)
if err !=nil {
return "", err
}
p.consistent.Inc(host)
time.AfterFunc(time.Second*10, func() { // drop the host after 10 seconds(for testing)!
fmt.Printf("dropping host: %s after 10 second\n", host)
p.consistent.Done(host)
})
resp, err :=http.Get(fmt.Sprintf("http://%s?key=%s", host, key))
if err !=nil {
return "", err
}
defer resp.Body.Close()
body, _ :=ioutil.ReadAll(resp.Body)
fmt.Printf("Response from host %s: %s\n", host, string(body))
return string(body), nil
}
注意:这里模拟的是单个key请求可能会持续10s钟;
启动代理服务器时增加路由:
main.go
func startServer(port string) {
// ......
http.HandleFunc("/key_least", getKeyLeast)
// ......
}
func getKeyLeast(w http.ResponseWriter, r *http.Request) {
_=r.ParseForm()
val, err :=p.GetKeyLeast(r.Form["key"][0])
if err !=nil {
w.WriteHeader(http.StatusInternalServerError)
_, _=fmt.Fprintf(w, err.Error())
return
}
_, _=fmt.Fprintf(w, fmt.Sprintf("key: %s, val: %s", r.Form["key"][0], val))
}
启动代理服务器,并开启三台缓存服务器;
通过下面的命令获取含有负载边界的Key:
$ curl localhost:18888/key_least?key=123
key: 123, val: hello: 123
多次请求后的结果如下:
```
start proxy server: 18888
register host: localhost:8080 success
register host: localhost:8081 success
register host: localhost:8082 success
Response from host localhost:8080: hello: 123
Response from host localhost:8080: hello: 123
Response from host localhost:8082: hello: 123
Response from host localhost:8082: hello: 123
Response from host localhost:8081: hello: 123
Response from host localhost:8080: hello: 123
Response from host localhost:8082: hello: 123
Response from host localhost:8081: hello: 123
Response from host localhost:8080: hello: 123
Response from host localhost:8082: hello: 123
Response from host localhost:8081: hello: 123
Response from host localhost:8080: hello: 123
Response from host localhost:8082: hello: 123
Response from host localhost:8081: hello: 123
Response from host localhost:8080: hello: 123
Response from host localhost:8080: hello: 123
Response from host localhost:8082: hello: 123
Response from host localhost:8080: hello: 123
Response from host localhost:8082: hello: 123
Response from host localhost:8082: hello: 123
```
可以看到,缓存被均摊到了其他服务器(这是由于一个缓存请求会持续10s导致的)!
本文抛砖引玉的讲解了一致性Hash算法的原理,并提供了Go的实现;
在此基础之上,根据 Google 的论文实现了带有负载边界的一致性Hash算法;
当然上面的代码在实际生产环境下仍然需要部分改进,如:
大家在实际使用时,可以根据需要,搭配实际的组件!
希表和哈希函数
在记录的存储位置和它的关键字之间是建立一个确定的对应关系(映射函数),使每个关键字和一个存储位置能唯一对应。
这个映射函数称为哈希函数,根据这个原则建立的表称为哈希表(Hash Table),也叫散列表。
以上描述,如果通过数学形式来描述就是:
若查找关键字为 key,则其值存放在 f(key) 的存储位置上。由此,不需比较便可直接取得所查记录。
注:哈希查找与线性表查找和树表查找最大的区别在于,不用数值比较。
冲突
若 key1 ≠ key2 ,而 f(key1)=f(key2),这种情况称为冲突(Collision)。
根据哈希函数f(key)和处理冲突的方法将一组关键字映射到一个有限的连续的地址集(区间)上,并以关键字在地址集中的“像”作为记录在表中的存储位置,这一映射过程称为构造哈希表。
构造哈希表这个场景就像汽车找停车位,如果车位被人占了,只能找空的地方停。
构造哈希表
由以上内容可知,哈希查找本身其实不费吹灰之力,问题的关键在于如何构造哈希表和处理冲突。
常见的构造哈希表的方法有 5 种:
(1)直接定址法
说白了,就是小学时学过的一元一次方程。
即 f(key)=a * key + b。其中,a和b 是常数。
(2)数字分析法
假设关键字是R进制数(如十进制)。并且哈希表中可能出现的关键字都是事先知道的,则可选取关键字的若干数位组成哈希地址。
选取的原则是使得到的哈希地址尽量避免冲突,即所选数位上的数字尽可能是随机的。
(3)平方取中法
取关键字平方后的中间几位为哈希地址。通常在选定哈希函数时不一定能知道关键字的全部情况,仅取其中的几位为地址不一定合适;
而一个数平方后的中间几位数和数的每一位都相关, 由此得到的哈希地址随机性更大。取的位数由表长决定。
(4)除留余数法
取关键字被某个不大于哈希表表长 m 的数 p 除后所得的余数为哈希地址。
即 f(key)=key % p (p ≤ m)
这是一种最简单、最常用的方法,它不仅可以对关键字直接取模,也可在折叠、平方取中等运算之后取模。
注意:p的选择很重要,如果选的不好,容易产生冲突。根据经验,一般情况下可以选p为素数。
(5)随机数法
选择一个随机函数,取关键字的随机函数值为它的哈希地址,即 f(key)=random(key)。
通常,在关键字长度不等时采用此法构造哈希函数较为恰当。
解决冲突
设计合理的哈希函数可以减少冲突,但不能完全避免冲突。
所以需要有解决冲突的方法,常见有两类
(1)开放定址法
如果两个数据元素的哈希值相同,则在哈希表中为后插入的数据元素另外选择一个表项。
当程序查找哈希表时,如果没有在第一个对应的哈希表项中找到符合查找要求的数据元素,程序就会继续往后查找,直到找到一个符合查找要求的数据元素,或者遇到一个空的表项。
例子
若要将一组关键字序列 {1, 9, 25, 11, 12, 35, 17, 29} 存放到哈希表中。
采用除留余数法构造哈希表;采用开放定址法处理冲突。
不妨设选取的p和m为13,由 f(key)=key % 13 可以得到下表。
需要注意的是,在上图中有两个关键字的探查次数为 2 ,其他都是1。
这个过程是这样的:
a. 12 % 13 结果是12,而它的前面有个 25 ,25 % 13 也是12,存在冲突。
我们使用开放定址法 (12 + 1) % 13=0,没有冲突,完成。
b. 35 % 13 结果是 9,而它的前面有个 9,9 % 13也是 9,存在冲突。
我们使用开放定址法 (9 + 1) % 13=10,没有冲突,完成。
(2)拉链法
将哈希值相同的数据元素存放在一个链表中,在查找哈希表的过程中,当查找到这个链表时,必须采用线性查找方法。
在这种方法中,哈希表中每个单元存放的不再是记录本身,而是相应同义词单链表的头指针。
例子
如果对开放定址法例子中提到的序列使用拉链法,得到的结果如下图所示:
实现一个哈希表
假设要实现一个哈希表,要求
a. 哈希函数采用除留余数法,即 f(key)=key % p (p ≤ m)
b. 解决冲突采用开放定址法,即 f2(key)=(f(key)+i) % size (p ≤ m)
(1)定义哈希表的数据结构
class HashTable {
public int key=0; // 关键字
public int data=0; // 数值
public int count=0; // 探查次数
}
(2)在哈希表中查找关键字key
根据设定的哈希函数,计算哈希地址。如果出现地址冲突,则按设定的处理冲突的方法寻找下一个地址。
如此反复,直到不冲突为止(查找成功)或某个地址为空(查找失败)。
/**
* 查找哈希表
* 构造哈希表采用除留取余法,即f(key)=key mod p (p ≤ size)
* 解决冲突采用开放定址法,即f2(key)=(f(key) + i) mod p (1 ≤ i ≤ size-1)
* ha为哈希表,p为模,size为哈希表大小,key为要查找的关键字
*/
public int searchHashTable(HashTable[] ha, int p, int size, int key) {
int addr=key % p; // 采用除留取余法找哈希地址
// 若发生冲突,用开放定址法找下一个哈希地址
while (ha[addr].key !=KEY && ha[addr].key !=key) {
addr=(addr + 1) % size;
}
if (ha[addr].key==key) {
return addr; // 查找成功
} else {
return FAILED; // 查找失败
}
}
(3)删除关键字为key的记录
在采用开放定址法处理冲突的哈希表上执行删除操作,只能在被删记录上做删除标记,而不能真正删除记录。
找到要删除的记录,将关键字置为删除标记DELKEY。
public int deleteHashTable(HashTable[] ha, int p, int size, int key) {
int addr=0;
addr=searchHashTable(ha, p, size, key);
if (FAILED !=addr) { // 找到记录
ha[addr].key=DELKEY; // 将该位置的关键字置为DELKEY
return SUCCESS;
} else {
return KEY; // 查找不到记录,直接返回KEY
}
}
(4)插入关键字为key的记录
将待插入的关键字key插入哈希表
先调用查找算法,若在表中找到待插入的关键字,则插入失败;
若在表中找到一个开放地址,则将待插入的结点插入到其中,则插入成功。
public void insertHashTable(HashTable[] ha, int p, int size, int key) {
int i=1;
int addr=0;
addr=key % p; // 通过哈希函数获取哈希地址
if (ha[addr].key==KEY || ha[addr].key==DELKEY) { // 如果没有冲突,直接插入
ha[addr].key=key;
ha[addr].count=1;
} else { // 如果有冲突,使用开放定址法处理冲突
do {
addr=(addr + 1) % size; // 寻找下一个哈希地址
i++;
} while (ha[addr].key !=KEY && ha[addr].key !=DELKEY);
ha[addr].key=key;
ha[addr].count=i;
}
}
(5)建立哈希表
先将哈希表中各关键字清空,使其地址为开放的,然后调用插入算法将给定的关键字序列依次插入。
public void createHashTable(HashTable[] ha, int[] list, int p, int size) {
int i=0;
// 将哈希表中的所有关键字清空
for (i=0; i < ha.length; i++) {
ha[i].key=KEY;
ha[i].count=0;
}
// 将关键字序列依次插入哈希表中
for (i=0; i < list.length; i++) {
this.insertHashTable(ha, p, size, list[i]);
}
}
完整代码
class HashTable {
public int key=0; // 关键字
public int data=0; // 数值
public int count=0; // 探查次数
}
public class HashSearch {
private final static int MAXSIZE=20;
private final static int KEY=1;
private final static int DELKEY=2;
private final static int SUCCESS=0;
private final static int FAILED=0xFFFFFFFF;
/**
* 查找哈希表
* 构造哈希表采用除留取余法,即f(key)=key mod p (p ≤ size)
* 解决冲突采用开放定址法,即f2(key)=(f(key) + i) mod p (1 ≤ i ≤ size-1)
* ha为哈希表,p为模,size为哈希表大小,key为要查找的关键字
*/
public int searchHashTable(HashTable[] ha, int p, int size, int key) {
int addr=key % p; // 采用除留取余法找哈希地址
// 若发生冲突,用开放定址法找下一个哈希地址
while (ha[addr].key !=KEY && ha[addr].key !=key) {
addr=(addr + 1) % size;
}
if (ha[addr].key==key) {
return addr; // 查找成功
} else {
return FAILED; // 查找失败
}
}
/**
* 删除哈希表中关键字为key的记录
* 找到要删除的记录,将关键字置为删除标记DELKEY
*/
public int deleteHashTable(HashTable[] ha, int p, int size, int key) {
int addr=0;
addr=searchHashTable(ha, p, size, key);
if (FAILED !=addr) { // 找到记录
ha[addr].key=DELKEY; // 将该位置的关键字置为DELKEY
return SUCCESS;
} else {
return KEY; // 查找不到记录,直接返回KEY
}
}
/**
* 将待插入的关键字key插入哈希表
* 先调用查找算法,若在表中找到待插入的关键字,则插入失败;
* 若在表中找到一个开放地址,则将待插入的结点插入到其中,则插入成功。
*/
public void insertHashTable(HashTable[] ha, int p, int size, int key) {
int i=1;
int addr=0;
addr=key % p; // 通过哈希函数获取哈希地址
if (ha[addr].key==KEY || ha[addr].key==DELKEY) { // 如果没有冲突,直接插入
ha[addr].key=key;
ha[addr].count=1;
} else { // 如果有冲突,使用开放定址法处理冲突
do {
addr=(addr + 1) % size; // 寻找下一个哈希地址
i++;
} while (ha[addr].key !=KEY && ha[addr].key !=DELKEY);
ha[addr].key=key;
ha[addr].count=i;
}
}
/**
* 创建哈希表
* 先将哈希表中各关键字清空,使其地址为开放的,然后调用插入算法将给定的关键字序列依次插入。
*/
public void createHashTable(HashTable[] ha, int[] list, int p, int size) {
int i=0
// 将哈希表中的所有关键字清空
for (i=0; i < ha.length; i++) {
ha[i].key=KEY;
ha[i].count=0;
}
// 将关键字序列依次插入哈希表中
for (i=0; i < list.length; i++) {
this.insertHashTable(ha, p, size, list[i]);
}
}
/**
* 输出哈希表
*/
public void displayHashTable(HashTable[] ha) {
int i=0;
System.out.format("pos:\t", "pos");
for (i=0; i < ha.length; i++) {
System.out.format("%4d", i);
}
System.out.println;
System.out.format("key:\t");
for (i=0; i < ha.length; i++) {
if (ha[i].key !=KEY) {
System.out.format("%4d", ha[i].key);
} else {
System.out.format(" ");
}
}
System.out.println;
System.out.format("count:\t");
for (i=0; i < ha.length; i++) {
if (0 !=ha[i].count) {
System.out.format("%4d", ha[i].count);
} else {
System.out.format(" ");
}
}
System.out.println;
}
public static void main(String[] args) {
int list={ 3, 112, 245, 27, 44, 19, 76, 29, 90 };
HashTable ha=new HashTable[MAXSIZE];
for (int i=0; i < ha.length; i++) {
ha[i]=new HashTable;
}
HashSearch search=new HashSearch;
search.createHashTable(ha, list, 19, MAXSIZE);
search.displayHashTable(ha);
}
}
参考资料
《数据结构习题与解析》(B级第3版)
转自:静默虚空
http://www.cnblogs.com/jingmoxukong/p/4332252.html
-----这里是数学思维的聚集地------
“超级数学建模”(微信号supermodeling),每天学一点小知识,轻松了解各种思维,做个好玩的理性派。50万数学精英都在关注!
篇文章给大家带来的内容是关于JavaScript中散列表(哈希表)的详细介绍(代码示例),有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助。
散列表
散列表(Hash table,也叫哈希表),是根据键(Key)而直接访问在内存存储位置的数据结构。也就是说,它通过计算一个关于键值的函数,将所需查询的数据映射到表中一个位置来访问记录,这加快了查找速度。这个映射函数称做散列函数,存放记录的数组称做散列表。
我们从上图开始分析
通过上面简单的例子,应该会有一下几点一个大致的理解
那么我们在接着看一般我们会怎么去取值呢?
比如我们存储一个key为1000,value为'张三' ---> {key:1000,value:'张三'}
从我们上述的解释,它是不是应该存放在1000%10的这个插槽里。
当我们通过key想要找到value张三,是不是到key%10这个插槽里找就可以了呢?到了这里你可以停下来思考一下。
散列的一些术语(可以简单的看一下)
看到这里不知道你是否大致理解了散列函数是什么了没。通过例子,再通过你的思考,你可以回头在读一遍文章头部关于散列表的定义。如果你能读懂了,那么我估计你应该是懂了。
常用的散列函数
处理整数 h=>k%M (也就是我们上面所举的例子)
处理字符串:
function h_str(str,M){
return [...str].reduce((hash,c)=>{
hash=(31*hash + c.charCodeAt(0)) % M
},0)
}
hash算法不是这里的重点,我也没有很深入的去研究,这里主要还是去理解散列表是个怎样的数据结构,它有哪些优点,它具体做了怎样一件事。
而hash函数它只是通过某种算法把key映射到列表中。
构建散列表
通过上面的解释,我们这里做一个简单的散列表
散列表的组成
初始化
- 初始化散列表有多少个槽
- 用一个数组来创建M个槽
class HashTable {
constructor(num=1000){
this.M=num;
this.slots=new Array(num);
}
}
散列函数
处理字符串的散列函数,这里使用字符串是因为,数值也可以传换成字符串比较通用一些
先将传递过来的key值转为字符串,为了能够严谨一些
将字符串转换为数组, 例如'abc'=> [...'abc']=> ['a','b','c']
分别对每个字符的charCodeAt进行计算,取M余数是为了刚好对应插槽的数量,你总共就10个槽,你的数值%10 肯定会落到 0-9的槽里
h(str){
str=str + '';
return [...str].reduce((hash,c)=>{
hash=(331 * hash + c.charCodeAt()) % this.M;
return hash;
},0)
}
添加
调用hash函数得到对应的存储地址(就是我们之间类比的槽)
因为一个槽中可能会存多个值,所以需要用一个二维数组去表示,比如我们计算得来的槽的编号是0,也就是slot[0],那么我们应该往slot[0] [0]里存,后面进来的同样是编号为0的槽的话就接着往slot[0] [1]里存
add(key,value) {
const h=this.h(key);
// 判断这个槽是否是一个二维数组, 不是则创建二维数组
if(!this.slots[h]){
this.slots[h]=[];
}
// 将值添加到对应的槽中
this.slots[h].push(value);
}
删除
通过hash算法,找到所在的槽
通过过滤来删除
delete(key){
const h=this.h(key);
this.slots[h]=this.slots[h].filter(item=>item.key!==key);
}
查找
通过hash算法找到对应的槽
用find函数去找同一个key的值
返回对应的值
search(key){
const h=this.h(key);
const list=this.slots[h];
const data=list.find(x=> x.key===key);
return data ? data.value : null;
}
总结
讲到这里,散列表的数据结构已经讲完了,其实我们每学一种数据结构或算法的时候,不是去照搬实现的代码,我们要学到的是思想,比如说散列表它究竟做了什么,它是一种存储方式,可以快速的通过键去查找到对应的值。那么我们会思考,如果我们设计的槽少了,在同一个槽里存放了大量的数据,那么这个散列表它的搜索速度肯定是会大打折扣的,这种情况又应该用什么方式去解决,又或者是否用其他的数据结构的代替它。
补充一个小知识点
v8引擎中的数组 arr=[1,2,3,4,5] 或 new Array(100) 我们都知道它是开辟了一块连续的空间去存储,而arr=[] , arr[100000]=10 这样的操作它是使用的散列,因为这种操作如果连续开辟100万个空间去存储一个值,那么显然是在浪费空间。
以上就是JavaScript中散列表(哈希表)的详细介绍(代码示例)的详细内容,更多请关注其它相关文章!
更多技巧请《转发 + 关注》哦!
*请认真填写需求信息,我们会在24小时内与您取得联系。