前言
多线程/进程编程是每个程序员的基本功,同时也是开发中的难点,处理各种“锁”的问题是让人十分头痛的一件事。
Actor模型,在1973由Carl Hewitt定义,被Erlang OTP推广,其消息传递更加符合面向对象的原始意图。Actor模型属于并发组件模型,通过组件方式定义并发编程范式的高级阶段,避免使用者直接接触多线程并发或线程池等基础概念。
Actor模型的基础就是消息传递,一个Actor模型可以认为是一个基本的计算单元,它能接收消息并基于消息执行运算,也可以发送消息给其他Actor模型。各个Actor模型之间相互隔离,不共享内存。
Actor模型本身封装了状态和行为,在进行并发编程时,Actor模型只需要关注消息和其本身。而消息是一个不可变对象,所以Actor模型不需要去关注锁和内存原子性等一系列多线程常见的问题。
1:上代码
核心代码
MySynMsgChan 同步时使用
package myqueue
import (
"gameserver/myqueue/queue/mpsc"
"runtime"
"sync/atomic"
"log"
"time"
)
const (
idle int32 = iota
running
)
// //
type Dispatcher interface {
Schedule(fn func())
Throughput() int
}
type goroutineDispatcher int
func (goroutineDispatcher) Schedule(fn func()) {
go fn()
}
func (d goroutineDispatcher) Throughput() int {
return int(d)
}
func NewDefaultDispatcher(throughput int) Dispatcher {
return goroutineDispatcher(throughput)
}
type synchronizedDispatcher int
func (synchronizedDispatcher) Schedule(fn func()) {
fn()
}
func (d synchronizedDispatcher) Throughput() int {
return int(d)
}
func NewSynchronizedDispatcher(throughput int) Dispatcher {
return synchronizedDispatcher(throughput)
}
/
type Statistics interface {
MessageReceived(message interface{})
}
type MyQueue struct {
userMailbox *mpsc.Queue
schedulerStatus int32
userMessages int32
suspended bool
dispatcher Dispatcher
mailboxStats []Statistics
bStop bool
}
func (m *MyQueue) PostUserMessage(message interface{}) {
if m.isStop() {
return
}
// for _, ms := range m.mailboxStats {
// ms.MessagePosted(message)
// }
m.userMailbox.Push(message)
atomic.AddInt32(&m.userMessages, 1)
//m.schedule()
if atomic.CompareAndSwapInt32(&m.schedulerStatus, idle, running) {
m.dispatcher.Schedule(m.processMessages)
}
}
func (m *MyQueue) processMessages() {
process:
m.run()
// set mailbox to idle
atomic.StoreInt32(&m.schedulerStatus, idle)
user := atomic.LoadInt32(&m.userMessages)
// check if there are still messages to process (sent after the message loop ended)
if user > 0 {
// try setting the mailbox back to running
if atomic.CompareAndSwapInt32(&m.schedulerStatus, idle, running) {
// fmt.Printf("looping %v %v %v\n", sys, user, m.suspended)
goto process
}
}
}
func (m *MyQueue) run() {
var msg interface{}
// var bok bool
defer func() {
if r := recover(); r != nil {
log.Printf("(m *MyQueue) run() err=%v msg=%#v \n", r, msg)
}
}()
i, t := 0, m.dispatcher.Throughput()
for {
if i > t {
i = 0
runtime.Gosched()
}
i++
if msg = m.userMailbox.Pop(); msg != nil {
atomic.AddInt32(&m.userMessages, -1)
if len(m.mailboxStats) > 0 && m.mailboxStats[0] != nil {
m.mailboxStats[0].MessageReceived(msg)
}
} else {
return
}
}
}
func (m *MyQueue) Stop() {
if !m.bStop {
m.bStop = true
}
}
//
func (m *MyQueue) isStop() bool {
return m.bStop
}
func UnboundedMyQueue(dispatcher Dispatcher, mailboxStats ...Statistics) *MyQueue {
return &MyQueue{
userMailbox: mpsc.New(),
mailboxStats: mailboxStats,
dispatcher: dispatcher,
}
}
type MySynMsgChan struct {
MsgData interface{}
MsgChan chan interface{}
ChanState *int32
}
func GenerateMySynMsg(data interface{}) *MySynMsgChan {
chanState := new(int32)
*chanState = 1
return &MySynMsgChan{data, make(chan interface{}, 1), chanState}
}
func (self *MySynMsgChan) WaitSynReply(d time.Duration) (interface{}, bool) {
if d < time.Millisecond {
d = time.Millisecond
}
timetick := time.NewTimer(d)
for {
select {
case <-timetick.C:
atomic.StoreInt32(self.ChanState, 0)
return nil, false
case msg, ok := <-self.MsgChan:
atomic.StoreInt32(self.ChanState, 0)
timetick.Reset(0)
if ok {
return msg, true
}
}
}
return nil, false
}
使用也很简单
实现 MessageReceived 接口
func CeateUserActor(ws *websocket.Conn, wsIndex uint32, accname, headrul string, platformId, serverid uint32) {
useractor := new(UserActor)
useractor.WsConn = ws
useractor.shareUserState = new(shr.ShareState)
useractor.shareUserState.State = 1
useractor.remoteip = RetmoteIp(ws.RemoteAddr().String())
......................................................
useractor.wsConnIndex = wsIndex
useractor.u32State = STATE_READY_RECV_LOGIN
useractor.CurMaxItemID = base.ITEM_ID_MIN_NUM //默认开始值
useractor.CurMaxHeroID = base.HERO_ID_MIN_NUM //默认开始值
useractor.msgQue = myqueue.UnboundedMyQueue(myqueue.NewDefaultDispatcher(64), useractor)
useractor.commonBagItemMap = map[uint64]uint64{}
useractor.endTimeItemMap = treemap.NewWith(treeutils.UInt64Comparator)
useractor.msgQue.PostUserMessage(&shr.NewSessionQueryUserInfo{})
SetUpRecv(ws, useractor.msgQue, wsIndex, accname, serverid)
}
..........................
func (self *UserActor) MessageReceived(r interface{}) {
if self.u32State == STATE_CLOSE {
return
}
switch r.(type) {
case *shr.NewSessionQueryUserInfo: //新连接查询
self.doNewSessionQueryPlayer()
case *shr.ClientRecvNetMsg: //protobuf
if !self.doClienNetData(r.(*shr.ClientRecvNetMsg).Clientdata) {
self.doDisconnect(self.wsConnIndex)
}
case *shr.NetDisconnect: //网络断开
dis := r.(*shr.NetDisconnect)
self.doDisconnect(dis.DisIndex)
...................................................
}
//
..........................发送消息 会放入mailbox里
func SendMsgToAccMgr(v interface{}) {
*****.msgque.PostUserMessage(v)
}
同步等待回复
2:测试,这个已经在项目上用过了,这里不专门再写DEMO测试
3:如果觉得有用,麻烦点个赞,加个收藏