golang并发编程模型之actor(一)

发布于:2024-12-06 ⋅ 阅读:(80) ⋅ 点赞:(0)

前言
多线程/进程编程是每个程序员的基本功,同时也是开发中的难点,处理各种“锁”的问题是让人十分头痛的一件事。
Actor模型,在1973由Carl Hewitt定义,被Erlang OTP推广,其消息传递更加符合面向对象的原始意图。Actor模型属于并发组件模型,通过组件方式定义并发编程范式的高级阶段,避免使用者直接接触多线程并发或线程池等基础概念。
Actor模型的基础就是消息传递,一个Actor模型可以认为是一个基本的计算单元,它能接收消息并基于消息执行运算,也可以发送消息给其他Actor模型。各个Actor模型之间相互隔离,不共享内存。
Actor模型本身封装了状态和行为,在进行并发编程时,Actor模型只需要关注消息和其本身。而消息是一个不可变对象,所以Actor模型不需要去关注锁和内存原子性等一系列多线程常见的问题。

1:上代码
在这里插入图片描述
核心代码
MySynMsgChan 同步时使用

package myqueue

import (
	"gameserver/myqueue/queue/mpsc"
	"runtime"
	"sync/atomic"
	"log"
	"time"
)

const (
	idle int32 = iota
	running
)

// //
type Dispatcher interface {
	Schedule(fn func())
	Throughput() int
}

type goroutineDispatcher int

func (goroutineDispatcher) Schedule(fn func()) {
	go fn()
}

func (d goroutineDispatcher) Throughput() int {
	return int(d)
}

func NewDefaultDispatcher(throughput int) Dispatcher {
	return goroutineDispatcher(throughput)
}

type synchronizedDispatcher int

func (synchronizedDispatcher) Schedule(fn func()) {
	fn()
}

func (d synchronizedDispatcher) Throughput() int {
	return int(d)
}

func NewSynchronizedDispatcher(throughput int) Dispatcher {
	return synchronizedDispatcher(throughput)
}

/

type Statistics interface {
	MessageReceived(message interface{})
}

type MyQueue struct {
	userMailbox     *mpsc.Queue
	schedulerStatus int32
	userMessages    int32
	suspended       bool
	dispatcher      Dispatcher
	mailboxStats    []Statistics
	bStop           bool
}

func (m *MyQueue) PostUserMessage(message interface{}) {
	if m.isStop() {
		return
	}
	//	for _, ms := range m.mailboxStats {
	//		ms.MessagePosted(message)
	//	}
	m.userMailbox.Push(message)
	atomic.AddInt32(&m.userMessages, 1)
	//m.schedule()
	if atomic.CompareAndSwapInt32(&m.schedulerStatus, idle, running) {
		m.dispatcher.Schedule(m.processMessages)
	}
}

func (m *MyQueue) processMessages() {

process:
	m.run()

	// set mailbox to idle
	atomic.StoreInt32(&m.schedulerStatus, idle)
	user := atomic.LoadInt32(&m.userMessages)
	// check if there are still messages to process (sent after the message loop ended)
	if user > 0 {
		// try setting the mailbox back to running
		if atomic.CompareAndSwapInt32(&m.schedulerStatus, idle, running) {
			//	fmt.Printf("looping %v %v %v\n", sys, user, m.suspended)
			goto process
		}
	}

}

func (m *MyQueue) run() {
	var msg interface{}
	//	var bok bool

	defer func() {
		if r := recover(); r != nil {
			log.Printf("(m *MyQueue) run() err=%v  msg=%#v \n", r, msg)
		}
	}()

	i, t := 0, m.dispatcher.Throughput()
	for {
		if i > t {
			i = 0
			runtime.Gosched()
		}

		i++

		if msg = m.userMailbox.Pop(); msg != nil {
			atomic.AddInt32(&m.userMessages, -1)
			if len(m.mailboxStats) > 0 && m.mailboxStats[0] != nil {
				m.mailboxStats[0].MessageReceived(msg)
			}
		} else {
			return
		}
	}
}

func (m *MyQueue) Stop() {
	if !m.bStop {
		m.bStop = true
	}
}

// 
func (m *MyQueue) isStop() bool {
	return m.bStop
}

func UnboundedMyQueue(dispatcher Dispatcher, mailboxStats ...Statistics) *MyQueue {

	return &MyQueue{
		userMailbox:  mpsc.New(),
		mailboxStats: mailboxStats,
		dispatcher:   dispatcher,
	}
}

type MySynMsgChan struct {
	MsgData   interface{}
	MsgChan   chan interface{}
	ChanState *int32
}

func GenerateMySynMsg(data interface{}) *MySynMsgChan {
	chanState := new(int32)
	*chanState = 1
	return &MySynMsgChan{data, make(chan interface{}, 1), chanState}
}

func (self *MySynMsgChan) WaitSynReply(d time.Duration) (interface{}, bool) {
	if d < time.Millisecond {
		d = time.Millisecond
	}
	timetick := time.NewTimer(d)
	for {
		select {
		case <-timetick.C:
			atomic.StoreInt32(self.ChanState, 0)
			return nil, false
		case msg, ok := <-self.MsgChan:
			atomic.StoreInt32(self.ChanState, 0)
			timetick.Reset(0)
			if ok {
				return msg, true
			}
		}
	}
	return nil, false
}

使用也很简单
实现 MessageReceived 接口

func CeateUserActor(ws *websocket.Conn, wsIndex uint32, accname, headrul string, platformId, serverid uint32) {
	useractor := new(UserActor)
	useractor.WsConn = ws

	useractor.shareUserState = new(shr.ShareState)
	useractor.shareUserState.State = 1
	
	useractor.remoteip = RetmoteIp(ws.RemoteAddr().String())

......................................................
	useractor.wsConnIndex = wsIndex
	useractor.u32State = STATE_READY_RECV_LOGIN

	useractor.CurMaxItemID = base.ITEM_ID_MIN_NUM //默认开始值
	useractor.CurMaxHeroID = base.HERO_ID_MIN_NUM //默认开始值

	useractor.msgQue = myqueue.UnboundedMyQueue(myqueue.NewDefaultDispatcher(64), useractor)

	useractor.commonBagItemMap = map[uint64]uint64{}
	useractor.endTimeItemMap = treemap.NewWith(treeutils.UInt64Comparator)

	useractor.msgQue.PostUserMessage(&shr.NewSessionQueryUserInfo{})
	SetUpRecv(ws, useractor.msgQue, wsIndex, accname, serverid)
}
..........................
func (self *UserActor) MessageReceived(r interface{}) {
	if self.u32State == STATE_CLOSE {
		return
	}
	switch r.(type) {
	case *shr.NewSessionQueryUserInfo: //新连接查询
		self.doNewSessionQueryPlayer()
	case *shr.ClientRecvNetMsg: //protobuf
		if !self.doClienNetData(r.(*shr.ClientRecvNetMsg).Clientdata) {
			self.doDisconnect(self.wsConnIndex)
		}
	case *shr.NetDisconnect: //网络断开
		dis := r.(*shr.NetDisconnect)
		self.doDisconnect(dis.DisIndex)
		...................................................
}

//
..........................发送消息 会放入mailbox里
func SendMsgToAccMgr(v interface{}) {
	*****.msgque.PostUserMessage(v)
}

同步等待回复
在这里插入图片描述

2:测试,这个已经在项目上用过了,这里不专门再写DEMO测试

3:如果觉得有用,麻烦点个赞,加个收藏


网站公告

今日签到

点亮在社区的每一天
去签到

热门文章