最近我们在做数据库的技术选型,要做选型的话难免需要对数据库进行一个基准测试,以便可以横向对比不同数据库性能。
YCSB,全称为“Yahoo!Cloud Serving Benchmark”,是雅虎开发的用来对云服务进行基础测试的工具,其内部涵盖了常见的NoSQL数据库产品,如Cassandra、MongoDB、HBase、Redis等等。
作为一名go开发人员,所以我们使用 pingcap 开发的Go YCSB来进行基准测试。
首先要保证本地 Go 版本不低于 1.16,然后下载编译:
git clone https://github.com/pingcap/go-ycsb.git
cd go-ycsb
make
在 bin 文件夹里面就放着我们编译好的程序 go-ycsb。
我们先来看一下 workloads 文件夹,目录下有各种workload的模板,可以基于workload模板进行自定义修改。默认的6种测试场景如下:
所以我们可以依据不同的 workload 多维度的对系统进行测试。workload里面的操作主要包括:
在测试的时候,我们还需要根据不同的业务场景来模拟测试,那么可以通过 requestdistribution 控制:
下面我们看一下workload里面可以填哪些参数:
# 目前只实现了这一种
workload=core
# 总记录数
recordcount=1000000
# 测试阶段被操作的记录数,如果设置了 threadcount,那么每个线程操作的记录数=operationcount/threadcount
operationcount=3000000
# 线程数
threadcount=500
# 如果一个表里面已经有记录数了,那么load的数据的时候从这个记录数开始
insertstart=0
# 一行数据的字段数
fieldcount=10
# 每个字段大小
fieldlength=100
# 是否应该读取所有字段
readallfields=true
# 是否应该更新所有字段
writeallfields=false
# 字段长度分布
fieldlengthdistribution=constant
#fieldlengthdistribution=uniform
#fieldlengthdistribution=zipfian
# 读操作概率
readproportion=0.95
# 更新操作概率
updateproportion=0.05
# 插入操作概率
insertproportion=0
# 先读后写操作同一条记录概率
readmodifywriteproportion=0
# 范围操作的概率
scanproportion=0
# 范围操作,最大的可操作的记录数
maxscanlength=1000
# 用来选择扫描时访问的记录数量分布情况
scanlengthdistribution=uniform
#scanlengthdistribution=zipfian
# 记录应按顺序插入还是伪随机插入
insertorder=hashed
#insertorder=ordered
# 以什么方式模拟测试
requestdistribution=zipfian
#requestdistribution=uniform
#requestdistribution=latest
# 下面这两种方式时针对requestdistribution为hotspot的时候
# 构成热点集的数据项的百分比
hotspotdatafraction=0.2
# 访问热点集的数据操作百分比
hotspotopnfraction=0.8
# 操作数据的表名
table=usertable
# 延迟测量结果展现形式,暂时没实现
measurementtype=histogram
比如我们现在要测试 redis 的性能,先写一个 workload:
recordcount=1000000
operationcount=1000000
workload=core
readallfields=true
readmodifywriteproportion=1
requestdistribution=uniform
redis.addr=127.0.0.1:6379
threadcount=50
上面的这个 workload 表示在 load 的时候会插入100万条数据到库里面,操作的数据量也是100万,但是有50个线程,也就是每个线程实际操作2万行记录;
测试方式使用 readmodifywriteproportion,先读后写,操作记录采用 uniform 也就是随机方式进行。
先 load 数据:
./bin/go-ycsb load redis -P workloads/workloada
再运行测试:
./bin/go-ycsb run redis -P workloads/workloada
返回:
READ_MODIFY_WRITE - Takes(s): 18.8, Count: 499312, OPS: 26539.8, Avg(us): 1388, Min(us): 107, Max(us): 42760, 99th(us): 3000, 99.9th(us): 7000, 99.99th(us): 26000
当然对于我来说,肯定还是要看一下它的代码是怎么做的,学习一下大佬是如何写代码的对我们工作也是很有帮助。
对于 Go YCSB 来说,它总共有这么几个组成部分:
我们以 redis 为例先看一下,如果要测试自己的 Database 该怎么办。
在 Go YCSB 中,所有的 DB 都放在 db 这个目录下面:
所以,我们可以在这个文件夹下面创建自己的db,然后构造一个 struct ,实现 DB 这个接口:
type DB interface {
ToSqlDB() *sql.DB
Close() error
InitThread(ctx context.Context, threadID int, threadCount int) context.Context
CleanupThread(ctx context.Context)
Read(ctx context.Context, table string, key string, fields []string) (map[string][]byte, error)
Scan(ctx context.Context, table string, startKey string, count int, fields []string) ([]map[string][]byte, error)
Update(ctx context.Context, table string, key string, values map[string][]byte) error
Insert(ctx context.Context, table string, key string, values map[string][]byte) error
Delete(ctx context.Context, table string, key string) error
}
里面定义了具体的 DB 操作。
然后需要定义一个工厂,用来创建这个 DB struct,实现DBCreator接口:
type DBCreator interface {
Create(p *properties.Properties) (DB, error)
}
然后需要定义一个 init 函数,在启动的时候进行 DBCreator 注册:
func init() {
ycsb.RegisterDBCreator("redis", redisCreator{})
}
var dbCreators = map[string]DBCreator{}
func RegisterDBCreator(name string, creator DBCreator) {
_, ok := dbCreators[name]
if ok {
panic(fmt.Sprintf("duplicate register database %s", name))
}
dbCreators[name] = creator
}
RegisterDBCreator 会在初始化的时候被调用。用来获取 init 方法注册过的 DB。通过这种方式 Go YCSB 实现了 DB 的自定义化。
首先 Go YCSB 在运行的时候会使用 cobra 根据传入的是 load 还是 run 执行到下面两个不同的方法:
func runLoadCommandFunc(cmd *cobra.Command, args []string) {
runClientCommandFunc(cmd, args, false)
}
func runTransCommandFunc(cmd *cobra.Command, args []string) {
runClientCommandFunc(cmd, args, true)
}
这里会调用到 runClientCommandFunc 函数中:
func runClientCommandFunc(cmd *cobra.Command, args []string, doTransactions bool) {
dbName := args[0]
// 初始化全局参数
initialGlobal(dbName, func() {
doTransFlag := "true"
if !doTransactions {
doTransFlag = "false"
}
globalProps.Set(prop.DoTransactions, doTransFlag)
if cmd.Flags().Changed("threads") {
// We set the threadArg via command line.
globalProps.Set(prop.ThreadCount, strconv.Itoa(threadsArg))
}
if cmd.Flags().Changed("target") {
globalProps.Set(prop.Target, strconv.Itoa(targetArg))
}
if cmd.Flags().Changed("interval") {
globalProps.Set(prop.LogInterval, strconv.Itoa(reportInterval))
}
})
fmt.Println("***************** properties *****************")
for key, value := range globalProps.Map() {
fmt.Printf("\"%s\"=\"%s\"\n", key, value)
}
fmt.Println("**********************************************")
// 初始化 client
c := client.NewClient(globalProps, globalWorkload, globalDB)
start := time.Now()
// 运行测试
c.Run(globalContext)
fmt.Printf("Run finished, takes %s\n", time.Now().Sub(start))
// 测试结果输出
measurement.Output()
}
参数的初始化主要是在 initialGlobal 里面做的:
func initialGlobal(dbName string, onProperties func()) {
...
go func() {
http.ListenAndServe(addr, nil)
}()
//初始化 measurement
measurement.InitMeasure(globalProps)
if len(tableName) == 0 {
tableName = globalProps.GetString(prop.TableName, prop.TableNameDefault)
}
// 获取 WorkloadCreator
workloadName := globalProps.GetString(prop.Workload, "core")
workloadCreator := ycsb.GetWorkloadCreator(workloadName)
//创建Workload
var err error
if globalWorkload, err = workloadCreator.Create(globalProps); err != nil {
util.Fatalf("create workload %s failed %v", workloadName, err)
}
// 获取要被测试的 db
dbCreator := ycsb.GetDBCreator(dbName)
if dbCreator == nil {
util.Fatalf("%s is not registered", dbName)
}
// 创建 db
if globalDB, err = dbCreator.Create(globalProps); err != nil {
util.Fatalf("create db %s failed %v", dbName, err)
}
globalDB = client.DbWrapper{globalDB}
}
这里最主要的是创建 Workload 和 DB。Workload 里面会初始化很多配置文件里面的信息。
runClientCommandFunc 里面会调用 client 的 Run 方法执行测试:
func (c *Client) Run(ctx context.Context) {
var wg sync.WaitGroup
threadCount := c.p.GetInt(prop.ThreadCount, 1)
wg.Add(threadCount)
measureCtx, measureCancel := context.WithCancel(ctx)
measureCh := make(chan struct{}, 1)
go func() {
defer func() {
measureCh <- struct{}{}
}()
// 这里很有意思,因为有时候我们做数据库是需要初始化数据到缓存里面的
// 所以开始的一段时间我们不能计入测试统计中,这里有隔预热时间,可以通过 warmuptime 配置
if c.p.GetBool(prop.DoTransactions, true) {
dur := c.p.GetInt64(prop.WarmUpTime, 0)
select {
case <-ctx.Done():
return
case <-time.After(time.Duration(dur) * time.Second):
}
}
// 预热完毕
measurement.EnableWarmUp(false)
dur := c.p.GetInt64(prop.LogInterval, 10)
t := time.NewTicker(time.Duration(dur) * time.Second)
defer t.Stop()
for {
select {
// 在运行的时候每隔 10 秒输出一次统计信息
case <-t.C:
measurement.Output()
case <-measureCtx.Done():
return
}
}
}()
// 做一些初始化的工作,如mysql需要创建表
if err := c.workload.Init(c.db); err != nil {
fmt.Printf("Initialize workload fail: %v\n", err)
return
}
// 根据 threadCount 创建多个线程操作数据库
for i := 0; i < threadCount; i++ {
go func(threadId int) {
defer wg.Done()
// 初始化 worker
w := newWorker(c.p, threadId, threadCount, c.workload, c.db)
ctx := c.workload.InitThread(ctx, threadId, threadCount)
ctx = c.db.InitThread(ctx, threadId, threadCount)
// 开始跑测试
w.run(ctx)
// 跑完测试做清理工作
c.db.CleanupThread(ctx)
c.workload.CleanupThread(ctx)
}(i)
}
// 等待测试跑完
wg.Wait()
measureCancel()
<-measureCh
}
这里分为两个部分:第一部分是创建一个线程,这个线程会控制是否开始测试统计,然后会每隔10秒输出一次统计信息;第二部分是根据设置的 threadcount 创建线程,运行 Worker 运行测试;
newWorker 的时候会根据 operationcount 设置 totalOpCount 表示总共需要执行次数,用 totalOpCount / int64(threadCount)
设置 opCount 表示 单线程操作的记录数。
func (w *worker) run(ctx context.Context) {
// 将线程操作分散开来,这样它们就不会同时击中DB了。
if w.targetOpsPerMs > 0.0 && w.targetOpsPerMs <= 1.0 {
time.Sleep(time.Duration(rand.Int63n(w.targetOpsTickNs)))
}
startTime := time.Now()
// 循环直到操作数达到 opsDone
for w.opCount == 0 || w.opsDone < w.opCount {
var err error
opsCount := 1
// 这里是执行基准测试
if w.doTransactions {
if w.doBatch {
err = w.workload.DoBatchTransaction(ctx, w.batchSize, w.workDB)
opsCount = w.batchSize
} else {
err = w.workload.DoTransaction(ctx, w.workDB)
}
// 这里是执行 load 数据
} else {
if w.doBatch {
err = w.workload.DoBatchInsert(ctx, w.batchSize, w.workDB)
opsCount = w.batchSize
} else {
err = w.workload.DoInsert(ctx, w.workDB)
}
}
// 预热完了会进行操作次数的统计
if measurement.IsWarmUpFinished() {
w.opsDone += int64(opsCount)
w.throttle(ctx, startTime)
}
select {
case <-ctx.Done():
return
default:
}
}
}
基准测试的具体执行是交给 workload 的 DoTransaction 方法来判断执行。
func (c *core) DoTransaction(ctx context.Context, db ycsb.DB) error {
state := ctx.Value(stateKey).(*coreState)
r := state.r
// 根据会根据不同的测试场景,进入到不同的测试分支
// Next 方法会根据设置的 readproportion、updateproportion、 scanproportion等概率来获取相应操作类型
operation := operationType(c.operationChooser.Next(r))
switch operation {
case read:
return c.doTransactionRead(ctx, db, state)
case update:
return c.doTransactionUpdate(ctx, db, state)
case insert:
return c.doTransactionInsert(ctx, db, state)
case scan:
return c.doTransactionScan(ctx, db, state)
default:
return c.doTransactionReadModifyWrite(ctx, db, state)
}
}
这里会调用 operationChooser 的 Next 方法来判断该执行那个指令,执行指令的概率是我们在配置文件里面设置好的。
这个算法很简单,在初始化 operationChooser 会将设置的参数readproportion、updateproportion、 scanproportion的值以数组的形式 add 到 operationChooser 的 values 里面,然后随机一个 0~1的小数,检查这个随机数落在哪个范围就好了:
func (d *Discrete) Next(r *rand.Rand) int64 {
sum := float64(0)
for _, p := range d.values {
sum += p.Weight
}
// 随机一个 0~1的小数
val := r.Float64()
for _, p := range d.values {
pw := p.Weight / sum
if val < pw {
d.SetLastValue(p.Value)
return p.Value
}
val -= pw
}
panic("oops, should not get here.")
}
在代码实现上就是按照上面说的,将所有 values 的值加起来得到 sum,然后计算每个 value 的占比是否达到随机数值。
最后我们再来看看 doTransactionRead 是怎么执行的:
func (c *core) doTransactionRead(ctx context.Context, db ycsb.DB, state *coreState) error {
r := state.r
// 根据我们设置的 requestdistribution 获取一个 key 值
keyNum := c.nextKeyNum(state)
keyName := c.buildKeyName(keyNum)
//被读取的字段
var fields []string
if !c.readAllFields {
// 如果不是读取所有字段,那么根据fieldChooser字段选择器选择一个字段执行
fieldName := state.fieldNames[c.fieldChooser.Next(r)]
fields = append(fields, fieldName)
} else {
fields = state.fieldNames
}
//调用 db 的read方法
values, err := db.Read(ctx, c.table, keyName, fields)
if err != nil {
return err
}
//校验数据完整性
if c.dataIntegrity {
c.verifyRow(state, keyName, values)
}
return nil
}
这里首先会调用 nextKeyNum 去获取 key 值,这里的 key 会根据我们设置的 requestdistribution 参数根据一定的规则获取到。然后校验完需要读哪些字段后调用 DbWrapper 的 Read 方法读取数据。
func (db DbWrapper) Read(ctx context.Context, table string, key string, fields []string) (_ map[string][]byte, err error) {
start := time.Now()
defer func() {
// 进行测试数据统计
measure(start, "READ", err)
}()
return db.DB.Read(ctx, table, key, fields)
}
DbWrapper 会封装一层,用 defer 方法调用 measure 进行统计。
不过这里我有问题是在读取数据的时候通过还会根据传入的 fields 来进行解析,这样也会损耗一些性能,不知是否合理,如redis 的 Read 方法:
func (r *redis) Read(ctx context.Context, table string, key string, fields []string) (map[string][]byte, error) {
data := make(map[string][]byte, len(fields))
res, err := r.client.Get(table + "/" + key).Result()
if err != nil {
return nil, err
}
// 反序列化
err = json.Unmarshal([]byte(res), &data)
if err != nil {
return nil, err
}
// TODO: filter by fields
return data, err
}
每一次操作完毕之后都会调用到 measure 方法,进行测试数据统计。
func measure(start time.Time, op string, err error) {
// 计算耗时
lan := time.Now().Sub(start)
if err != nil {
measurement.Measure(fmt.Sprintf("%s_ERROR", op), lan)
return
}
measurement.Measure(op, lan)
}
统计信息由于是会有多个线程同时操作,所以需要使用线程安全的方式进行操作:
func (h *histogram) Measure(latency time.Duration) {
// 这里是 us 微秒
n := int64(latency / time.Microsecond)
atomic.AddInt64(&h.sum, n)
atomic.AddInt64(&h.count, 1)
// 这里转为毫秒ms
bound := int(n / h.boundInterval)
// boundCounts 是一个并发map,用来统计每个时间段(单位:ms)中有多少次操作
h.boundCounts.Upsert(bound, 1, func(ok bool, existedValue int64, newValue int64) int64 {
if ok {
return existedValue + newValue
}
return newValue
})
// 设置最小时延
for {
oldMin := atomic.LoadInt64(&h.min)
if n >= oldMin {
break
}
if atomic.CompareAndSwapInt64(&h.min, oldMin, n) {
break
}
}
// 设置最大时延
for {
oldMax := atomic.LoadInt64(&h.max)
if n <= oldMax {
break
}
if atomic.CompareAndSwapInt64(&h.max, oldMax, n) {
break
}
}
}
统计每个时间段(单位:ms)内操作的次数是使用 boundCounts,它是 Go YCSB 自己实现的 ConcurrentMap 保证线程安全,用来统计单位时间内操作的次数;
最大和最小时延是通过 CAS 进行操作的,也是为了保证线程安全。
统计完之后会调用 getInfo 计算耗时:
func (h *histogram) getInfo() map[string]interface{} {
min := atomic.LoadInt64(&h.min)
max := atomic.LoadInt64(&h.max)
sum := atomic.LoadInt64(&h.sum)
count := atomic.LoadInt64(&h.count)
bounds := h.boundCounts.Keys()
sort.Ints(bounds)
avg := int64(float64(sum) / float64(count))
per99 := 0
per999 := 0
per9999 := 0
opCount := int64(0)
// 计算 P99,P99.9,P99.99
// 这里实际上是统计一个占比
// bound 里面会保存每毫秒有多少次操作
for _, bound := range bounds {
boundCount, _ := h.boundCounts.Get(bound)
opCount += boundCount
per := float64(opCount) / float64(count)
// 这里是 99% 的操作是落在哪个时间区间内
if per99 == 0 && per >= 0.99 {
per99 = (bound + 1) * 1000
}
if per999 == 0 && per >= 0.999 {
per999 = (bound + 1) * 1000
}
if per9999 == 0 && per >= 0.9999 {
per9999 = (bound + 1) * 1000
}
}
// 计算整个测试耗时
elapsed := time.Now().Sub(h.startTime).Seconds()
// 计算单位耗时内操作次数
qps := float64(count) / elapsed
res := make(map[string]interface{})
res[ELAPSED] = elapsed
res[COUNT] = count
res[QPS] = qps
res[AVG] = avg
res[MIN] = min
res[MAX] = max
res[PER99TH] = per99
res[PER999TH] = per999
res[PER9999TH] = per9999
return res
}
这里的 per99、per999、per9999 实际上精度只有毫秒,是为了做直方图导出而设计的(然后作者在这个项目已经过去3年了,还没加上这个功能)。
通过上面的分析可以发现, Go YCSB 设计还是很精妙的,通过很少的代码就可以进行 DB 的扩展;配置也是相当灵活,可以根据不同的 requestdistribution 提供了不同的测试环境,并且在测试中也可以随意的调整读写概率,保证可以尽可能的模拟线上的环境。
但是它也有很多不足,一方面是文档很不充分,基本上就写了几个参数配置;另一方面就是很多功能都没有实现,线上测试的时候经常会出现ERROR,去代码一看结果是没有实现。三年前作者的博客中就说要实现测试结果导出功能,结果现在还没实现。我已经给作者 tl@pingcap.com 发邮件了,等待回复。
https://github.com/pingcap/go-ycsb
https://github.com/brianfrankcooper/YCSB/wiki/Running-a-Workload
本文由哈喽比特于3年以前收录,如有侵权请联系我们。
文章来源:https://mp.weixin.qq.com/s/ssZL3svZMbq0CFhCc70kNg
京东创始人刘强东和其妻子章泽天最近成为了互联网舆论关注的焦点。有关他们“移民美国”和在美国购买豪宅的传言在互联网上广泛传播。然而,京东官方通过微博发言人发布的消息澄清了这些传言,称这些言论纯属虚假信息和蓄意捏造。
日前,据博主“@超能数码君老周”爆料,国内三大运营商中国移动、中国电信和中国联通预计将集体采购百万台规模的华为Mate60系列手机。
据报道,荷兰半导体设备公司ASML正看到美国对华遏制政策的负面影响。阿斯麦(ASML)CEO彼得·温宁克在一档电视节目中分享了他对中国大陆问题以及该公司面临的出口管制和保护主义的看法。彼得曾在多个场合表达了他对出口管制以及中荷经济关系的担忧。
今年早些时候,抖音悄然上线了一款名为“青桃”的 App,Slogan 为“看见你的热爱”,根据应用介绍可知,“青桃”是一个属于年轻人的兴趣知识视频平台,由抖音官方出品的中长视频关联版本,整体风格有些类似B站。
日前,威马汽车首席数据官梅松林转发了一份“世界各国地区拥车率排行榜”,同时,他发文表示:中国汽车普及率低于非洲国家尼日利亚,每百户家庭仅17户有车。意大利世界排名第一,每十户中九户有车。
近日,一项新的研究发现,维生素 C 和 E 等抗氧化剂会激活一种机制,刺激癌症肿瘤中新血管的生长,帮助它们生长和扩散。
据媒体援引消息人士报道,苹果公司正在测试使用3D打印技术来生产其智能手表的钢质底盘。消息传出后,3D系统一度大涨超10%,不过截至周三收盘,该股涨幅回落至2%以内。
9月2日,坐拥千万粉丝的网红主播“秀才”账号被封禁,在社交媒体平台上引发热议。平台相关负责人表示,“秀才”账号违反平台相关规定,已封禁。据知情人士透露,秀才近期被举报存在违法行为,这可能是他被封禁的部分原因。据悉,“秀才”年龄39岁,是安徽省亳州市蒙城县人,抖音网红,粉丝数量超1200万。他曾被称为“中老年...
9月3日消息,亚马逊的一些股东,包括持有该公司股票的一家养老基金,日前对亚马逊、其创始人贝索斯和其董事会提起诉讼,指控他们在为 Project Kuiper 卫星星座项目购买发射服务时“违反了信义义务”。
据消息,为推广自家应用,苹果现推出了一个名为“Apps by Apple”的网站,展示了苹果为旗下产品(如 iPhone、iPad、Apple Watch、Mac 和 Apple TV)开发的各种应用程序。
特斯拉本周在美国大幅下调Model S和X售价,引发了该公司一些最坚定支持者的不满。知名特斯拉多头、未来基金(Future Fund)管理合伙人加里·布莱克发帖称,降价是一种“短期麻醉剂”,会让潜在客户等待进一步降价。
据外媒9月2日报道,荷兰半导体设备制造商阿斯麦称,尽管荷兰政府颁布的半导体设备出口管制新规9月正式生效,但该公司已获得在2023年底以前向中国运送受限制芯片制造机器的许可。
近日,根据美国证券交易委员会的文件显示,苹果卫星服务提供商 Globalstar 近期向马斯克旗下的 SpaceX 支付 6400 万美元(约 4.65 亿元人民币)。用于在 2023-2025 年期间,发射卫星,进一步扩展苹果 iPhone 系列的 SOS 卫星服务。
据报道,马斯克旗下社交平台𝕏(推特)日前调整了隐私政策,允许 𝕏 使用用户发布的信息来训练其人工智能(AI)模型。新的隐私政策将于 9 月 29 日生效。新政策规定,𝕏可能会使用所收集到的平台信息和公开可用的信息,来帮助训练 𝕏 的机器学习或人工智能模型。
9月2日,荣耀CEO赵明在采访中谈及华为手机回归时表示,替老同事们高兴,觉得手机行业,由于华为的回归,让竞争充满了更多的可能性和更多的魅力,对行业来说也是件好事。
《自然》30日发表的一篇论文报道了一个名为Swift的人工智能(AI)系统,该系统驾驶无人机的能力可在真实世界中一对一冠军赛里战胜人类对手。
近日,非营利组织纽约真菌学会(NYMS)发出警告,表示亚马逊为代表的电商平台上,充斥着各种AI生成的蘑菇觅食科普书籍,其中存在诸多错误。
社交媒体平台𝕏(原推特)新隐私政策提到:“在您同意的情况下,我们可能出于安全、安保和身份识别目的收集和使用您的生物识别信息。”
2023年德国柏林消费电子展上,各大企业都带来了最新的理念和产品,而高端化、本土化的中国产品正在不断吸引欧洲等国际市场的目光。
罗永浩日前在直播中吐槽苹果即将推出的 iPhone 新品,具体内容为:“以我对我‘子公司’的了解,我认为 iPhone 15 跟 iPhone 14 不会有什么区别的,除了序(列)号变了,这个‘不要脸’的东西,这个‘臭厨子’。