打造先进的内存KV数据库-7 反射以及并发锁

news/2024/5/19 6:39:40 标签: 内存数据库, 线程安全, 反射, 并发锁

反射">反射

反射作为一种代码组织形式,带来了极大的不安全因素,同时也带来了许多便利之处,通过方法、对象、类型名称来获得具体实例,可以避免大量if-else分支,使得代码优雅,monkeyDB的服务端代码最后采用反射组织。

并发锁">并发锁

多线程访问同一资源时,需要对资源加锁,否则可能会得到预料之外的后果。由于内存数据库优越的读写性能,锁的粒度可以尽量大,monkeyDB使用库级锁(相当于表级锁)来保证线程安全
MonkeyDB项目已开源至 https://github.com/InsZVA/MonkeyDB 采用GPL协议

代码

//server.go
package main
// #cgo LDFLAGS: -L ./lib -lmonkeyS
// #include "./lib/core.h"
// #include <stdlib.h>
import "C"
import (
    "unsafe"
    "fmt"
    "net"
    "strings"
    "./tcp"
    "./convert"
    "reflect"
    "./minheap"
    "strconv"
    "flag"
    "sync"
)

////////////////////////////////////// 库级锁   ////////////////////////////////////////////////////////////////

var mutex map[C.Database]*sync.RWMutex

////////////////////////////////////// MinHeap ////////////////////////////////////////////////////////////////

///////////////////////////////////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////// command类型 用于解析处理各种数据库命令 //////////////////////////////////////
///////////////////////////////////////////////////////////////////////////////////////////////////////////////
var (
    acceptedCmd = []string{"set","get","delete","remove","createdb","switchdb","dropdb",/*"push","pop","destroy",*/"listdb"}
    port = flag.String("p","1517","侦听端口号")
)

type command []byte

func (cmd command) Set(db **C.Database) []byte {
    //锁并发
    _,ok := mutex[**db];
    if !ok {
        mutex[**db] = new(sync.RWMutex)
    }
    mutex[**db].Lock()
    defer mutex[**db].Unlock()

    response := []byte{}
    key,next := convert.ParseUntil(cmd,' ',4)
    value,_ := convert.ParseUntil(cmd,0,next+1)
    //fmt.Println("set ",key,value)
    r := C.Set(&(*db).tIndex,(*C.char)(convert.Bytes2C(key)),(convert.Bytes2C(value)))
    for i := 0;;i++ {
        response = append(response,byte(r.msg[i]))
        if response[i] == 0 { break; }
    }
    return response
}

func (cmd command) Get(db **C.Database) []byte {
    //锁并发
    _,ok := mutex[**db];
    if !ok {
        mutex[**db] = new(sync.RWMutex)
    }
    mutex[**db].RLock()
    defer mutex[**db].RUnlock()

    response := []byte{}
    key,_ := convert.ParseUntil(cmd,0,4)
    r := C.Get(&(*db).tIndex,(*C.char)(convert.Bytes2C(key)))
    if int(r.code) == 0 {
        for i := 0;;i++ {
            response = append(response,byte(*(*C.char)(unsafe.Pointer((uintptr(r.pData)+uintptr(i))))))
            if response[i] == 0 { break; }
        }
    }else {
    }
    return response
}

func (cmd command) Delete(db **C.Database) []byte {
    return cmd.Remove(db)
}

func (cmd command) Remove(db **C.Database) []byte {
    //锁并发
    _,ok := mutex[**db];
    if !ok {
        mutex[**db] = new(sync.RWMutex)
    }
    mutex[**db].Lock()
    defer mutex[**db].Unlock()

    response := []byte{}
    key,_ := convert.ParseUntil(cmd,0,7)
    r := C.Delete(&(*db).tIndex,(*C.char)(convert.Bytes2C(key)))
    for i := 0;;i++ {
        response = append(response,byte(r.msg[i]))
        if response[i] == 0 { break; }
    }
    return response
}

func (cmd command) Createdb(db **C.Database) []byte {
    response := []byte{}
    key,_ := convert.ParseUntil(cmd,0,9)
    d := C.CreateDB((*C.char)(convert.Bytes2C(key)))
    if d != nil {
        *db = d
        response = []byte("Already exist,switched\n")
    }else {
        response = []byte("Created\n")
    }
    return response
}

func (cmd command) Switchdb(db **C.Database) []byte {
    response := []byte{}
    key,_ := convert.ParseUntil(cmd,0,9)
    d := C.SwitchDB((*C.char)(convert.Bytes2C(key)))
    if d != nil {
        *db = d
        response = []byte("ok\n")
    }else {
        response = []byte("fail\n")
    }
    return response
}

func (cmd command) Dropdb(db **C.Database) []byte {
    //锁并发
    _,ok := mutex[**db];
    if !ok {
        mutex[**db] = new(sync.RWMutex)
    }
    mutex[**db].Lock()
    defer mutex[**db].Unlock()

    response := []byte{}
    key,_ := convert.ParseUntil(cmd,0,7)
    *db = C.DropDB((*C.char)(convert.Bytes2C(key)))
    return response
}

func (cmd command) Listdb(db **C.Database) []byte {
    response := []byte{}
    r := C.ListDB()
    for i := 0;i < 1024;i++ {
        b := byte(*(*C.char)(unsafe.Pointer(uintptr(unsafe.Pointer(r))+uintptr(i))))
        response = append(response,b)
        if(b == 0){ break; }
    }
    C.free(unsafe.Pointer(r))
    return response
}

func (cmd command) Push(db **C.Database) []byte {
    var response []byte
    key,next := convert.ParseUntil(cmd,' ',5)
    key2,next := convert.ParseUntil(cmd,' ',next+1)
    value,_ := convert.ParseUntil(cmd,0,next+1)
    var i int
    btnode := C.BTree_search((*db).tIndex,(C.KeyType)(C.GetHash((*C.char)(convert.Bytes2C(key)))),(*C.int)(unsafe.Pointer(&i)))
    if uintptr(unsafe.Pointer(btnode)) != uintptr(0) {  //B树上有此堆
        heap := (*minheap.MinHeap)(btnode.pRecord[i])   //BUG GO GC 会回收掉上次的内存
        k,_ := strconv.Atoi(string(key2))
        v,_ := strconv.Atoi(string(value))
        heap.Push(minheap.Pair{Key:uint32(k),Value:uint32(v)})
        response = []byte("Pushed")
    }else {                 //建一个新的堆
        heap := minheap.New()
        C.BTree_insert(&(*db).tIndex, (C.KeyType)(C.GetHash((*C.char)(convert.Bytes2C(key)))), unsafe.Pointer(&heap))
        k,_ := strconv.Atoi(string(key2))
        v,_ := strconv.Atoi(string(value))
        heap.Push(minheap.Pair{Key:uint32(k),Value:uint32(v)})
        response = []byte("Created and pushed")
    }
    return response
}
//////////////////////////////////////////////////////////////////////////////////////////////////////////////
func initDB() { //初始化数据库

    str0 := "monkey"
    C.CreateDB((*C.char)(convert.String2C(str0)))   //创建基础数据库
    str0 = "config"                     //配置数据库
    C.CreateDB((*C.char)(convert.String2C(str0)))
    key := "passwd"     //初始密码
    data := "monkey"
    db := C.SwitchDB((*C.char)(convert.String2C(str0)))
    C.Set(&(db.tIndex),(*C.char)(convert.String2C(key)),(convert.String2C(data)))

}

func listen() {

    servicePort := ":"+*port
    tcpAddr,err := net.ResolveTCPAddr("tcp4",servicePort)
    if err != nil {
        panic(err)
    }
    l,err := net.ListenTCP("tcp",tcpAddr)   //侦听TCP
    if err != nil {
        panic(err)
    }
    fmt.Println("Server Started on "+*port+"!")
    for{
        conn,err := l.AcceptTCP()
        conn.SetKeepAlive(true)
        conn.SetNoDelay(true)
        if err != nil {
            panic(err)
        }
        s := tcp.TCPSession{Conn:conn}
        s.Init()
        go Handler(&s)
    }
}

func main() {
    flag.Parse()
    //初始化锁
    mutex = make(map[C.Database]*sync.RWMutex)
    initDB()
    listen()
}

func auth(s *tcp.TCPSession) bool {
    buff := s.ReadMessage() 
    params := strings.Split(string(buff)," ")
    str0 := "config"
    db := C.SwitchDB((*C.char)(convert.String2C(str0)))
    if params[0] != "auth" {
        s.SendMessage([]byte("Please auth first!"))
        return false
    }
    r := C.Get(&(db.tIndex),(*C.char)(convert.String2C("passwd")))
    if int(r.code) == 0 {
        passwd := []byte{}
        for i := 0;;i++ {
            passwd = append(passwd,byte(*(*C.char)(unsafe.Pointer((uintptr(r.pData)+uintptr(i))))))
            if passwd[i] == 0 {
                break
            }
        }
        if convert.Equal(passwd,[]byte(params[1])) {
            s.SendMessage([]byte("Auth success"))
            return true
        } else {
            s.SendMessage([]byte("Auth fail"))
            return false
        }
    }else {
        s.SendMessage([]byte("Auth success"))
        return true
    }
}

func Handler(s *tcp.TCPSession) {
    for !auth(s){
    }
    str := "monkey"                         
    db := C.SwitchDB((*C.char)(convert.String2C(str)))//环境变量-当前数据库
    for {
        if s.Closed {
            return
        }               
        buff := s.ReadMessage()
        // if err != nil {
        //  conn.Close()
        //  break
        // }
        if len(buff) == 0 {
            return
        }
        //commands := bytes.Split(buff,[]byte{0})
        //for _,cmd := range commands {
            TranslateMessage2(s,&db,buff)
        //}                     //解析消息
    }

}

func TranslateMessage2(s *tcp.TCPSession,db **C.Database,message []byte) {
    com := command(message)
    //fmt.Println("处理:",string(message))
    response := []byte{}
    for _,cmd := range acceptedCmd {
        if convert.StartBy(message,cmd) {
            result := reflect.ValueOf(com).MethodByName(convert.UpperHead(cmd)).Call([]reflect.Value{reflect.ValueOf(db)})
            response = result[0].Interface().([]byte)
            break
        }
    }
    s.SendMessage(response)
}


////////////////////////////////////////Dumplated//////////////////////////////////////////////////////////////////////////
func TranslateMessage(s *tcp.TCPSession,db **C.Database,message []byte) {
    command := string(message)
    params := strings.Split(command," ")
    //fmt.Println(params)
    response := []byte{}
    if params[0] == "set" {
        r := C.Set(&(*db).tIndex,(*C.char)(convert.String2C(params[1])),(convert.String2C(params[2])))
        for i := 0;;i++ {
            response = append(response,byte(r.msg[i]))
            if response[i] == 0 { break; }
        }
    }else if params[0] == "get" {
        r := C.Get(&(*db).tIndex,(*C.char)(convert.String2C(params[1])))
        // for i := 0;;i++ {
        //  response = append(response,byte(r.msg[i]))
        //  if response[i] == 0 { break; }
        // }
        if int(r.code) == 0 {
            for i := 0;;i++ {
                response = append(response,byte(*(*C.char)(unsafe.Pointer((uintptr(r.pData)+uintptr(i))))))
                if response[i] == 0 { break; }
            }
        }else {
            // for i := 0;;i++ {
            // response = append(response,byte(r.msg[i]))
            // if response[i] == 0 { break; }
            // }
        }

    }else if params[0] == "delete" || params[0] == "remove" {
        r := C.Delete(&(*db).tIndex,(*C.char)(convert.String2C(params[1])))
        for i := 0;;i++ {
            response = append(response,byte(r.msg[i]))
            if response[i] == 0 { break; }
        }

    }else if params[0] == "createdb" {
        d := C.CreateDB((*C.char)(convert.String2C(params[1])))
        if d != nil {
            *db = d
            response = []byte("Already exist,switched\n")
        }else {
            response = []byte("Created\n")
        }
    }else if params[0] == "switchdb" {
        d := C.SwitchDB((*C.char)(convert.String2C(params[1])))
        if d != nil {
            *db = d
            response = []byte("ok\n")
        }else {
            response = []byte("fail\n")
        }
    }else if params[0] == "dropdb" {
        *db = C.DropDB((*C.char)(convert.String2C(params[1])))
    }else if strings.EqualFold("listdb",params[0]) {
        r := C.ListDB()
        for i := 0;i < 1024;i++ {
            b := byte(*(*C.char)(unsafe.Pointer(uintptr(unsafe.Pointer(r))+uintptr(i))))
            response = append(response,b)
            if(b == 0){ break; }
        }
        C.free(unsafe.Pointer(r))
    }else {
        //fmt.Println("unkown command:",params[0])
    }
    s.SendMessage(response)
}

http://www.niftyadmin.cn/n/1072272.html

相关文章

Sapphire算法:GC Without Stop the World(上)

Go的GC一致为人诟病&#xff0c;然而Go1.5据说大大优化了GC&#xff0c;具体可以见这篇文章http://www.oschina.net/translate/go-gc-solving-the-latency-problem-in-go-1-5 于是我打开了Go源代码&#xff0c;查看了Go GC相关代码&#xff0c;注释中说&#xff0c;Go现在使用…

星际争霸中的建筑学 生产建筑的研究

生产建筑主要包括虫&#xff1a;基地人&#xff1a;基地&#xff0c;兵营&#xff0c;重工&#xff0c;飞机场神&#xff1a;基地&#xff0c;兵营&#xff0c;轰击工厂&#xff08;VR&#xff09;&#xff0c;飞机场 其中除了虫族的基地比较特殊之外&#xff0c;其他所有生产建…

Windows Phone 7 创建自定义的控件

创建自定义的控件&#xff1a;需要从控件&#xff08;或 ContentControl&#xff09;派生&#xff0c;至少&#xff0c;为了继承基本的控件功能&#xff0c;该控件类应从 Silverlight System.Windows.Controls.Control 类派生。但是&#xff0c;它也可以从 ContentControl 和 …

Go语言黑魔法中的问题修正

原文&#xff1a; http://studygolang.com/articles/2909 文中大多技巧都是正确的&#xff0c;但是结构体和[]byte之间的转换&#xff1a; 第三式 - 结构体和[]byte互转 有一天&#xff0c;你想把一个简单的结构体转成二进制数据保存起来&#xff0c;这时候你想到了encoding/g…

内存映射系统开发

为了使用内存作为数据库的主要存储方式&#xff0c;开发内存数据库&#xff0c;我们需要对内存中的数据进行保证。即可以备份与还原&#xff0c;那么为了将内存中的数据备份到外存中&#xff0c;我们可以采取以下策略&#xff1a; 选取一个外存文件&#xff0c;将其映射到某个…

[转载]Oracle数据库异构数据联结详解(1)

假设你有两个数据源头&#xff0c;如平面文件或表数据&#xff0c;并且要将他们兼并在一同&#xff0c;你将怎样做?假设他们有一个共同的属性&#xff0c;如客户ID&#xff0c;那么该打点方案应该是很明明&#xff1a;兼并相干的属性&#xff0c;在这个例子中&#xff0c;只需…

MySQL 管理软件

HeidiSQL web&#xff1a;http://www.heidisql.com免费的MySQL管理软件&#xff0c;而且是一个delphi的开源软件。Navicat for MySQL 9.1.6注册码SN: NAVD-ULQR-YD73-V3BRDownload: http://download.cnet.com/Navicat-MySQL-GUI/3000-10254_4-10071792.htmlnote&#xff1a;英文…

关于C#里面socket编程的一点理解

这几天倒腾socket编程&#xff0c;在博客园找到了张子阳大哥的socket编程的系列文章 看了很不错 。 网上也有许多类似的文章&#xff0c;我这里也纯属炒剩饭 。献丑了哈 首先还是用tcpListener 跟tcpClient吧 先把基础的搞懂了再说 socket编程的概念用我们以前老师的话说就是一…