Administrator
发布于 2022-04-13 / 382 阅读 / 0 评论 / 0 点赞

发布订阅

package main

import (
	"fmt"
	"github.com/gomodule/redigo/redis"
	"time"
	"unsafe"
)


type PSubscribeCallback func (pattern, channel, message string)

type PSubscriber struct {
	client redis.PubSubConn
	cbMap map[string]PSubscribeCallback
}


func (c *PSubscriber) PConnect(ip string, port uint16) {
	conn, err := redis.Dial("tcp", "127.0.0.1:6379")
	if err != nil {
		fmt.Println("redis dial failed.")
	}

	c.client = redis.PubSubConn{conn}
	c.cbMap = make(map[string]PSubscribeCallback)

	go func() {
		for {
			fmt.Println("wait...")
			switch res := c.client.Receive().(type) {
			case redis.Message:
				pattern := (*string)(unsafe.Pointer(&res.Pattern))
				channel := (*string)(unsafe.Pointer(&res.Channel))
				message := (*string)(unsafe.Pointer(&res.Data))
				c.cbMap[*channel](*pattern, *channel, *message)
			case redis.Subscription:
				fmt.Printf("%s: %s %d\n", res.Channel, res.Kind, res.Count)
			case error:
				fmt.Println("error handle...")
				continue
			}
		}
	}()

}
func (c *PSubscriber)Psubscribe(channel interface{}, cb PSubscribeCallback) {
	err := c.client.PSubscribe(channel)
	if err != nil{
		fmt.Println("redis Subscribe error.")
	}

	c.cbMap[channel.(string)] = cb
}

func TestPubCallback(patter , chann, msg string){
	fmt.Println( "TestPubCallback patter : " + patter + " channel : ", chann, " message : ", msg)
}

func main() {

	fmt.Println("===========main start============")


	var psub PSubscriber
	psub.PConnect("127.0.0.1", 6397)
	psub.Psubscribe("__keyevent@8__:expired", TestPubCallback)
	// 还可以是: `__keyspace@0__:cool`
	for{
		time.Sleep(1 * time.Second)
	}
}