Receiving Messages
Pods have several different methods beyond On to help with receiving messages in different scenarios. There are two types of receive methods: asynchronous and synchronous. Synchronous receive methods block until the desired message is received, whereas asynchronous receive methods run "in the background" and do not block at the callsite.
On and OnType are async, and they are shown here:
1
package main
2
3
import (
4
"fmt"
5
6
"github.com/suborbital/grav/grav"
7
)
8
9
func receiveMessagesAsync() {
10
g := grav.New()
11
12
// create a pod and set a function to be called on each message
13
// if the msgFunc returns an error, it indicates to the Gav instance that
14
// the message could not be handled.
15
p := g.Connect()
16
p.On(func(msg grav.Message) error {
17
fmt.Println("message received:", string(msg.Data()))
18
19
return nil
20
})
21
22
// create a second pod and set it to receive only messages of a specific type
23
// errors indicate that a message could not be handled.
24
p2 := g.Connect()
25
p2.OnType("grav.specialmessage", func(msg grav.Message) error {
26
fmt.Println("special message received:", string(msg.Data()))
27
28
return nil
29
})
30
}
31
Copied!
The sync receive methods WaitOn and WaitUntil are shown here:
1
package main
2
3
import (
4
"fmt"
5
"time"
6
7
"github.com/suborbital/grav/grav"
8
)
9
10
func receiveMessagesSync() {
11
g := grav.New()
12
13
go sendMessages(g.Connect())
14
15
// create a pod and set it to wait for a specific message.
16
// WaitOn will block forever if the message is never received.
17
p3 := g.Connect()
18
p3.WaitOn(func(msg grav.Message) error {
19
if msg.Type() != "grav.desiredmessage" {
20
return grav.ErrMsgNotWanted
21
}
22
23
fmt.Println("desired message received:", string(msg.Data()))
24
25
return nil
26
})
27
28
// create a pod that waits up to 3 seconds for a specific message
29
// ErrWaitTimeout is returned if the timeout elapses
30
p4 := g.Connect()
31
err := p4.WaitUntil(grav.Timeout(3), func(msg grav.Message) error {
32
if msg.Type() != "grav.importantmessage" {
33
return grav.ErrMsgNotWanted
34
}
35
36
fmt.Println("important message received:", string(msg.Data()))
37
38
return nil
39
})
40
41
// timeout will occur because message is never received
42
fmt.Println(err)
43
}
44
45
func sendMessages(p *grav.Pod) {
46
<-time.After(time.Millisecond * 500)
47
p.Send(grav.NewMsg("grav.desiredmessage", []byte("desired!")))
48
}
49
Copied!
The sync receive methods set the Pod's receive function to nil after returning, so the Pod stops accepting new messages. The async methods however will cause the Pod to continue receiving until the Pod is disconnected or the receive function is set to nil. This can be done with pod.On(nil)
For sync receive methods, the grav.ErrMsgNotWanted error is used to indicate that the desired message has not yet been received. Returning nil or another error will let the Pod know that the operation has completed, and the return value will be propagated to the caller if desired.
Last modified 1yr ago
Copy link