2 Star 3 Fork 1

roseduan / lotusdb

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
vlog_test.go 13.01 KB
一键复制 编辑 原始数据 按行查看 历史
roseduan 提交于 2022-03-20 12:20 . fix vlog compaction
package lotusdb
import (
"bytes"
"github.com/flower-corp/lotusdb/logfile"
"github.com/stretchr/testify/assert"
"io/ioutil"
"math/rand"
"os"
"path/filepath"
"reflect"
"testing"
"time"
)
func TestOpenValueLog(t *testing.T) {
t.Run("fileio", func(t *testing.T) {
testOpenValueLog(t, logfile.FileIO)
})
t.Run("mmap", func(t *testing.T) {
testOpenValueLog(t, logfile.MMap)
})
t.Run("set file state", func(t *testing.T) {
path, err := filepath.Abs(filepath.Join("/tmp", "vlog-test"))
assert.Nil(t, err)
err = os.MkdirAll(path, os.ModePerm)
assert.Nil(t, err)
defer func() {
_ = os.RemoveAll(path)
}()
vlog, err := openValueLogForTest(path, 180, logfile.FileIO, 0.5)
assert.Nil(t, err)
_, _, err = vlog.Write(&logfile.LogEntry{Key: GetKey(923), Value: GetValue128B()})
assert.Nil(t, err)
// open again, the old active log file is close to full, so we weill create a new active log file.
vlog1, err := openValueLogForTest(path, 180, logfile.FileIO, 0.5)
assert.Nil(t, err)
assert.NotNil(t, vlog1)
})
}
func testOpenValueLog(t *testing.T, ioType logfile.IOType) {
path, err := filepath.Abs(filepath.Join("/tmp", "vlog-test"))
assert.Nil(t, err)
err = os.MkdirAll(path, os.ModePerm)
assert.Nil(t, err)
defer func() {
_ = os.RemoveAll(path)
}()
type args struct {
path string
blockSize int64
ioType logfile.IOType
}
tests := []struct {
name string
args args
hasFiles bool
wantErr bool
}{
{
"size-zero", args{path: path, blockSize: 0, ioType: ioType}, false, true,
},
{
"no-files", args{path: path, blockSize: 100, ioType: ioType}, false, false,
},
{
"with-files", args{path: path, blockSize: 100, ioType: ioType}, true, false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.hasFiles {
for i := 1; i <= 5; i++ {
_, err := logfile.OpenLogFile(path, uint32(i), 100, logfile.ValueLog, ioType)
assert.Nil(t, err)
}
}
got, err := openValueLogForTest(tt.args.path, tt.args.blockSize, tt.args.ioType, 0.5)
if (err != nil) != tt.wantErr {
t.Errorf("openValueLog() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !tt.wantErr {
assert.NotNil(t, got)
}
})
}
}
func TestValueLog_Write(t *testing.T) {
t.Run("fileio", func(t *testing.T) {
testValueLogWrite(t, logfile.FileIO)
})
t.Run("mmap", func(t *testing.T) {
testValueLogWrite(t, logfile.MMap)
})
}
func testValueLogWrite(t *testing.T, ioType logfile.IOType) {
path, err := filepath.Abs(filepath.Join("/tmp", "vlog-test"))
assert.Nil(t, err)
err = os.MkdirAll(path, os.ModePerm)
assert.Nil(t, err)
defer func() {
_ = os.RemoveAll(path)
}()
vlog, err := openValueLogForTest(path, 1024<<20, ioType, 0.5)
assert.Nil(t, err)
type fields struct {
vlog *valueLog
}
type args struct {
e *logfile.LogEntry
}
tests := []struct {
name string
fields fields
args args
want *valuePos
wantErr bool
}{
// don`t run the sub test alone, because the offset is incremental, run them all at once!!!
{
"nil-entry", fields{vlog: vlog}, args{e: nil}, &valuePos{}, false,
},
{
"no-key", fields{vlog: vlog}, args{e: &logfile.LogEntry{Value: []byte("lotusdb")}}, &valuePos{Fid: 0, Offset: 0}, false,
},
{
"no-value", fields{vlog: vlog}, args{e: &logfile.LogEntry{Key: []byte("key1")}}, &valuePos{Fid: 0, Offset: 15}, false,
},
{
"with-key-value", fields{vlog: vlog}, args{e: &logfile.LogEntry{Key: []byte("key2"), Value: []byte("lotusdb-2")}}, &valuePos{Fid: 0, Offset: 27}, false,
},
{
"key-big-value", fields{vlog: vlog}, args{e: &logfile.LogEntry{Key: []byte("key3"), Value: GetValue4K()}}, &valuePos{Fid: 0, Offset: 48}, false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
vlog := tt.fields.vlog
got, _, err := vlog.Write(tt.args.e)
if (err != nil) != tt.wantErr {
t.Errorf("Write() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("Write() got = %v, want %v", got, tt.want)
}
// get from vlog.
if tt.args.e != nil {
e, err := vlog.Read(got.Fid, got.Offset)
assert.Nil(t, err)
if bytes.Compare(e.Key, tt.args.e.Key) != 0 || bytes.Compare(e.Value, tt.args.e.Value) != 0 {
t.Errorf("Write() write = %v, but got = %v", tt.args.e, got)
}
}
})
}
}
func TestValueLog_WriteAfterReopen(t *testing.T) {
path, err := filepath.Abs(filepath.Join("/tmp", "vlog-test"))
assert.Nil(t, err)
err = os.MkdirAll(path, os.ModePerm)
assert.Nil(t, err)
defer func() {
_ = os.RemoveAll(path)
}()
vlog, err := openValueLogForTest(path, 100, logfile.FileIO, 0.5)
assert.Nil(t, err)
tests := []*logfile.LogEntry{
{
Key: []byte("key-1"), Value: []byte("val-1")},
{
Key: []byte("key-2"), Value: []byte("val-2"),
},
}
var pos []*valuePos
pos1, _, err := vlog.Write(tests[0])
assert.Nil(t, err)
pos = append(pos, pos1)
err = vlog.Close()
assert.Nil(t, err)
// reopen it.
vlog, err = openValueLogForTest(path, 100, logfile.MMap, 0.5)
assert.Nil(t, err)
pos2, _, err := vlog.Write(tests[1])
assert.Nil(t, err)
pos = append(pos, pos2)
for i := 0; i < len(pos); i++ {
res, err := vlog.Read(pos[i].Fid, pos[i].Offset)
assert.Nil(t, err)
if bytes.Compare(res.Key, tests[i].Key) != 0 || bytes.Compare(res.Value, tests[i].Value) != 0 {
t.Errorf("WriteAfterReopen() write = %v, but got = %v", tests[i], res)
}
}
}
func TestValueLog_WriteUntilNewActiveFileOpen(t *testing.T) {
t.Run("fileio", func(t *testing.T) {
testValueLogWriteUntilNewActiveFileOpen(t, logfile.FileIO)
})
t.Run("mmap", func(t *testing.T) {
testValueLogWriteUntilNewActiveFileOpen(t, logfile.MMap)
})
}
func testValueLogWriteUntilNewActiveFileOpen(t *testing.T, ioType logfile.IOType) {
path, err := filepath.Abs(filepath.Join("/tmp", "vlog-test"))
assert.Nil(t, err)
err = os.MkdirAll(path, os.ModePerm)
assert.Nil(t, err)
defer func() {
_ = os.RemoveAll(path)
}()
vlog, err := openValueLogForTest(path, 10<<20, ioType, 0.5)
assert.Nil(t, err)
writeCount := 100000
var poses []*valuePos
random := rand.Intn(writeCount - 1)
if random == 0 {
random++
}
for i := 0; i <= writeCount; i++ {
pos, _, err := vlog.Write(&logfile.LogEntry{Key: GetKey(i), Value: GetValue128B()})
assert.Nil(t, err)
if i == 0 || i == writeCount || i == random {
poses = append(poses, pos)
}
}
// make sure all writes are valid.
for i := 0; i < len(poses); i++ {
e, err := vlog.Read(poses[i].Fid, poses[i].Offset)
assert.Nil(t, err)
if len(e.Key) == 0 && len(e.Value) == 0 {
t.Errorf("WriteUntilNewActiveFileOpen() write a valid entry, but got = %v", e)
}
}
}
func TestValueLog_Read(t *testing.T) {
t.Run("fileio", func(t *testing.T) {
testValueLogRead(t, logfile.FileIO)
})
t.Run("mmap", func(t *testing.T) {
testValueLogRead(t, logfile.MMap)
})
}
func testValueLogRead(t *testing.T, ioType logfile.IOType) {
path, err := filepath.Abs(filepath.Join("/tmp", "vlog-test"))
assert.Nil(t, err)
err = os.MkdirAll(path, os.ModePerm)
assert.Nil(t, err)
defer func() {
_ = os.RemoveAll(path)
}()
vlog, err := openValueLogForTest(path, 10<<20, ioType, 0.5)
assert.Nil(t, err)
type data struct {
e *logfile.LogEntry
pos *valuePos
}
var datas []*data
// write some data.
writeCount := 100000
random := rand.Intn(writeCount - 1)
if random == 0 {
random++
}
for i := 0; i <= writeCount; i++ {
v := GetValue128B()
pos, _, err := vlog.Write(&logfile.LogEntry{Key: GetKey(i), Value: v})
assert.Nil(t, err)
if i == 0 || i == writeCount || i == random {
datas = append(datas, &data{e: &logfile.LogEntry{Key: GetKey(i), Value: v}, pos: pos})
}
}
type fields struct {
vlog *valueLog
}
type args struct {
fid uint32
offset int64
}
tests := []struct {
name string
fields fields
args args
want *logfile.LogEntry
wantErr bool
}{
{
"invalid-fid", fields{vlog: vlog}, args{fid: 100, offset: 0}, nil, true,
},
{
"invalid-offset", fields{vlog: vlog}, args{fid: 0, offset: -23}, nil, true,
},
{
"offset-not-entry", fields{vlog: vlog}, args{fid: 0, offset: 1}, nil, true,
},
{
"valid-0", fields{vlog: vlog}, args{fid: datas[0].pos.Fid, offset: datas[0].pos.Offset}, datas[0].e, false,
},
{
"valid-1", fields{vlog: vlog}, args{fid: datas[1].pos.Fid, offset: datas[1].pos.Offset}, datas[1].e, false,
},
{
"valid-2", fields{vlog: vlog}, args{fid: datas[2].pos.Fid, offset: datas[2].pos.Offset}, datas[2].e, false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
vlog := tt.fields.vlog
got, err := vlog.Read(tt.args.fid, tt.args.offset)
if (err != nil) != tt.wantErr {
t.Errorf("Read() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("Read() got = %v, want %v", got, tt.want)
}
})
}
}
func TestValueLog_ReadFromArchivedFile(t *testing.T) {
path, err := filepath.Abs(filepath.Join("/tmp", "vlog-test"))
assert.Nil(t, err)
err = os.MkdirAll(path, os.ModePerm)
assert.Nil(t, err)
defer func() {
_ = os.RemoveAll(path)
}()
vlog, err := openValueLogForTest(path, 10<<20, logfile.FileIO, 0.5)
assert.Nil(t, err)
writeCount := 100000
for i := 0; i <= writeCount; i++ {
_, _, err := vlog.Write(&logfile.LogEntry{Key: GetKey(i), Value: GetValue128B()})
assert.Nil(t, err)
}
// close and reopen it.
err = vlog.Close()
assert.Nil(t, err)
vlog1, err := openValueLogForTest(path, 10<<20, logfile.FileIO, 0.5)
assert.Nil(t, err)
e, err := vlog1.Read(0, 0)
assert.Nil(t, err)
assert.True(t, len(e.Key) > 0)
assert.True(t, len(e.Value) > 0)
}
func TestValueLog_Sync(t *testing.T) {
path, err := filepath.Abs(filepath.Join("/tmp", "vlog-test"))
assert.Nil(t, err)
err = os.MkdirAll(path, os.ModePerm)
assert.Nil(t, err)
defer func() {
_ = os.RemoveAll(path)
}()
vlog, err := openValueLogForTest(path, 10<<20, logfile.FileIO, 0.5)
assert.Nil(t, err)
err = vlog.Sync()
assert.Nil(t, err)
}
func TestValueLog_Close(t *testing.T) {
path, err := filepath.Abs(filepath.Join("/tmp", "vlog-test"))
assert.Nil(t, err)
err = os.MkdirAll(path, os.ModePerm)
assert.Nil(t, err)
defer func() {
_ = os.RemoveAll(path)
}()
vlog, err := openValueLogForTest(path, 10<<20, logfile.MMap, 0.5)
assert.Nil(t, err)
err = vlog.Close()
assert.Nil(t, err)
}
func openValueLogForTest(path string, blockSize int64, ioType logfile.IOType, gcRatio float64) (*valueLog, error) {
opts := vlogOptions{
path: path,
blockSize: blockSize,
ioType: ioType,
gcRatio: gcRatio,
}
return openValueLog(opts)
}
func TestValueLog_Compaction_Normal(t *testing.T) {
opts := DefaultOptions("/tmp" + separator + "lotusdb")
opts.CfOpts.ValueLogFileSize = 16 * 1024 * 1024
opts.CfOpts.MemtableSize = 32 << 20
opts.CfOpts.ValueLogGCRatio = 0.5
opts.CfOpts.ValueLogGCInterval = time.Second * 7
db, err := Open(opts)
assert.Nil(t, err)
defer destroyDB(db)
// write enough data that can trigger flush operation.
var writeCount = 600000
for i := 0; i <= writeCount; i++ {
err := db.Put(GetKey(i), GetValue128B())
assert.Nil(t, err)
}
for i := 100; i < writeCount/2; i++ {
err := db.Delete(GetKey(i))
assert.Nil(t, err)
}
// flush again
for i := writeCount * 2; i <= writeCount*4; i++ {
err := db.Put(GetKey(i), GetValue128B())
assert.Nil(t, err)
}
time.Sleep(time.Second * 2)
}
//func TestValueLog_Compaction_While_Writing(t *testing.T) {
// testCompacction(t, false, true)
//}
//
//func TestValueLog_Compaction_While_Reading(t *testing.T) {
// testCompacction(t, true, false)
//}
//
//func TestValueLog_Compaction_Rewrite(t *testing.T) {
// testCompacction(t, false, false)
//}
func testCompacction(t *testing.T, reading, writing bool) {
path, _ := ioutil.TempDir("", "lotusdb")
opts := DefaultOptions(path)
opts.CfOpts.ValueLogFileSize = 16 * 1024 * 1024
opts.CfOpts.MemtableSize = 32 << 20
opts.CfOpts.ValueLogGCRatio = 0.5
opts.CfOpts.ValueLogGCInterval = time.Second * 7
db, err := Open(opts)
assert.Nil(t, err)
defer destroyDB(db)
// write enough data that can trigger flush operation.
var writeCount = 600000
for i := 0; i <= writeCount; i++ {
err := db.Put(GetKey(i), GetValue128B())
assert.Nil(t, err)
}
type kv struct {
key []byte
value []byte
}
var kvs []*kv
for i := 100; i < writeCount/2; i++ {
k, v := GetKey(i), GetValue128B()
err := db.Put(k, v)
assert.Nil(t, err)
kvs = append(kvs, &kv{key: k, value: v})
}
if reading {
go func() {
time.Sleep(time.Second)
rand.Seed(time.Now().UnixNano())
for {
k := GetKey(rand.Intn(writeCount / 2))
_, err := db.Get(k)
if err != nil {
t.Log("read data err.", err)
}
}
}()
}
if writing {
go func() {
time.Sleep(time.Second)
count := writeCount * 5
for {
err := db.Put(GetKey(count), GetValue128B())
count++
if err != nil {
t.Log("write data err.", err)
}
}
}()
}
// flush again
for i := writeCount * 2; i <= writeCount*4; i++ {
err := db.Put(GetKey(i), GetValue128B())
assert.Nil(t, err)
}
time.Sleep(time.Second * 2)
for _, kv := range kvs {
v, err := db.Get(kv.key)
assert.Nil(t, err)
assert.Equal(t, v, kv.value)
}
}
Go
1
https://gitee.com/roseduan/lotusdb.git
git@gitee.com:roseduan/lotusdb.git
roseduan
lotusdb
lotusdb
main

搜索帮助