nostr/client/relay.go

597 lines
16 KiB
Go

package client
import (
"bytes"
"fmt"
"net/http"
"os"
"sync"
"sync/atomic"
"time"
"github.com/gobwas/ws"
"github.com/gobwas/ws/wsutil"
"github.com/puzpuzpuz/xsync/v2"
"mleku.dev/git/nostr/connection"
"mleku.dev/git/nostr/context"
"mleku.dev/git/nostr/envelopes"
"mleku.dev/git/nostr/envelopes/authenvelope"
"mleku.dev/git/nostr/envelopes/closedenvelope"
"mleku.dev/git/nostr/envelopes/countenvelope"
"mleku.dev/git/nostr/envelopes/eoseenvelope"
"mleku.dev/git/nostr/envelopes/eventenvelope"
"mleku.dev/git/nostr/envelopes/noticeenvelope"
"mleku.dev/git/nostr/envelopes/okenvelope"
"mleku.dev/git/nostr/event"
"mleku.dev/git/nostr/filter"
"mleku.dev/git/nostr/filters"
"mleku.dev/git/nostr/interfaces/enveloper"
"mleku.dev/git/nostr/interfaces/subscriptionoption"
"mleku.dev/git/nostr/kind"
"mleku.dev/git/nostr/normalize"
"mleku.dev/git/nostr/relayinfo"
"mleku.dev/git/nostr/subscription"
"mleku.dev/git/nostr/tag"
"mleku.dev/git/nostr/tags"
"mleku.dev/git/nostr/timestamp"
"mleku.dev/git/slog"
)
var log, chk = slog.New(os.Stderr)
type Status int
var subscriptionIDCounter atomic.Int32
type T struct {
closeMutex sync.Mutex
url string
RequestHeader http.Header // e.g. for origin header
Connection *connection.C
Subscriptions *xsync.MapOf[string, *subscription.T]
ConnectionError error
connectionContext context.T // will be canceled when connection closes
connectionContextCancel context.F
challenge string // NIP-42 challenge, only keep the last
AuthRequired chan struct{}
notices chan string // NIP-01 NOTICEs
okCallbacks *xsync.MapOf[string, func(bool, string)]
writeQueue chan writeRequest
subscriptionChannelCloseQueue chan *subscription.T
// custom things that aren't often used
//
AssumeValid bool // skip verifying signatures of events from this relay
}
func (r *T) URL() string { return r.url }
func (r *T) Delete(key string) { r.Subscriptions.Delete(key) }
type writeRequest struct {
msg []byte
answer chan error
}
// NewRelay returns a new relay. The relay connection will be closed when the
// context is canceled.
func NewRelay(c context.T, url string, opts ...Option) *T {
ctx, cancel := context.Cancel(c)
r := &T{
url: normalize.URL(url),
connectionContext: ctx,
connectionContextCancel: cancel,
Subscriptions: xsync.NewMapOf[*subscription.T](),
okCallbacks: xsync.NewMapOf[func(bool, string)](),
writeQueue: make(chan writeRequest),
subscriptionChannelCloseQueue: make(chan *subscription.T),
AuthRequired: make(chan struct{}),
}
for _, opt := range opts {
switch o := opt.(type) {
case WithNoticeHandler:
r.notices = make(chan string)
go func() {
for n := range r.notices {
o(n)
}
}()
}
}
return r
}
// Connect returns a relay object connected to url. Once successfully
// connected, cancelling ctx has no effect. To close the connection, call
// r.Close().
func Connect(c context.T, url string, opts ...Option) (*T, error) {
r := NewRelay(c, url, opts...)
err := r.Connect(c)
return r, err
}
// ConnectWithAuth auths with the relay, checks if its NIP-11 says auth-required
// and uses the provided sec to sign the auth challenge.
func ConnectWithAuth(c context.T, url, sec string,
opts ...Option) (rl *T, err error) {
var inf *relayinfo.T
if inf, err = relayinfo.Fetch(c, url); chk.E(err) {
return
}
if rl, err = Connect(c, url, opts...); chk.E(err) {
return
}
// if NIP-11 doesn't say auth-required, we are done
if !inf.Limitation.AuthRequired {
return
}
// otherwise, expect auth immediately and sign on it. some relays may not send
// the auth challenge without being prompted by a req envelope but fuck them.
// auth-required in nip-11 should mean auth on connect. period.
authed := false
for i := 0; i < 2; i++ {
// but just in case, we will do this twice if need be. The first try may
// time out because the relay waits for a req, or because the auth
// doesn't trigger until a message is received.
select {
case <-rl.AuthRequired:
log.T.Ln("authing to relay")
if err = rl.Auth(c,
func(evt *event.T) error { return evt.Sign(sec) }); chk.D(err) {
return
}
authed = true
case <-time.After(2 * time.Second):
}
if authed {
log.T.Ln("authed to relay")
break
}
// to trigger this if auth wasn't immediately demanded, send out a dummy
// empty req.
one := 1
filt := filters.T{
{Limit: &one},
}
var sub *subscription.T
if sub, err = rl.Subscribe(c, filt); chk.E(err) {
// not sure what to do here
}
sub.Close()
// at this point if we haven't received an auth there is something wrong
// with the relay.
}
return
}
// When instantiating relay connections, some options may be passed.
// Option is the type of the argument passed for that.
type Option interface {
IsRelayOption()
}
// WithNoticeHandler just takes notices and is expected to do something with
// them. when not given, defaults to logging the notices.
type WithNoticeHandler func(notice string)
func (_ WithNoticeHandler) IsRelayOption() {}
var _ Option = (WithNoticeHandler)(nil)
// String just returns the relay URL.
func (r *T) String() string {
return r.url
}
// Context retrieves the context that is associated with this relay connection.
func (r *T) Context() context.T { return r.connectionContext }
// IsConnected returns true if the connection to this relay seems to be active.
func (r *T) IsConnected() bool { return r.connectionContext.Err() == nil }
// Connect tries to establish a websocket connection to r.URL. If the context
// expires before the connection is complete, an error is returned. Once
// successfully connected, context expiration has no effect: call r.Close to
// close the connection.
//
// The underlying relay connection will use a background context. If you want to
// pass a custom context to the underlying relay connection, use NewRelay() and
// then Relay.Connect().
func (r *T) Connect(c context.T) (err error) {
if r.connectionContext == nil || r.Subscriptions == nil {
return fmt.Errorf("relay must be initialized with a call to NewRelay()")
}
if r.url == "" {
return fmt.Errorf("invalid relay URL '%s'", r.URL())
}
if _, ok := c.Deadline(); !ok {
// if no timeout is set, force it to 7 seconds
var cancel context.F
c, cancel = context.Timeout(c, 7*time.Second)
defer cancel()
}
var conn *connection.C
conn, err = connection.NewConnection(c, r.url, r.RequestHeader)
if err != nil {
return fmt.Errorf("error opening websocket to '%s': %w", r.URL(), err)
}
r.Connection = conn
// ping every 29 seconds
ticker := time.NewTicker(29 * time.Second)
// to be used when the connection is closed
go func() {
<-r.connectionContext.Done()
// close these things when the connection is closed
if r.notices != nil {
close(r.notices)
}
// stop the ticker
ticker.Stop()
// close all subscriptions
r.Subscriptions.Range(func(_ string, sub *subscription.T) bool {
go sub.Unsub()
return true
})
}()
// queue all write operations here so we don't do mutex spaghetti
go func() {
var err error
for {
select {
case <-ticker.C:
err = wsutil.WriteClientMessage(r.Connection.Conn, ws.OpPing,
nil)
if err != nil {
log.D.F("{%s} error writing ping: %v; closing websocket",
r.URL(), err)
chk.D(r.Close()) // this should trigger a context cancelation
return
}
case wr := <-r.writeQueue:
// all write requests will go through this to prevent races
if err = r.Connection.WriteMessage(wr.msg); err != nil {
wr.answer <- err
}
close(wr.answer)
case <-r.connectionContext.Done():
// stop here
return
}
}
}()
// general message reader loop
go r.MessageReadLoop(conn)
return nil
}
func (r *T) MessageReadLoop(conn *connection.C) {
buf := new(bytes.Buffer)
var err error
for {
buf.Reset()
if err = conn.ReadMessage(r.connectionContext, buf); err != nil {
r.ConnectionError = err
chk.D(r.Close())
break
}
message := buf.Bytes()
log.T.F("{%s} received %v", r.URL(), string(message))
var envelope enveloper.I
envelope, _, err = envelopes.ProcessEnvelope(message)
if envelope == nil {
continue
}
switch env := envelope.(type) {
case *noticeenvelope.T:
// see WithNoticeHandler
if r.notices != nil {
r.notices <- env.Text
} else {
log.D.F("NOTICE from %s: '%s'", r.URL(), env.Text)
}
case *authenvelope.Challenge:
r.challenge = env.Challenge
log.D.Ln("challenge", r.challenge)
close(r.AuthRequired)
case *eventenvelope.T:
if env.SubscriptionID == "" {
continue
}
if s, ok := r.Subscriptions.Load(env.SubscriptionID.String()); !ok {
log.D.F("{%s} no subscription with id '%s'",
r.URL(), env.SubscriptionID.String())
continue
} else {
// check if the event matches the desired filter, ignore otherwise
if !s.Filters.Match(env.Event) {
log.D.F("{%s} filter does not match: %v ~ %v",
r.URL(), s.Filters, env.Event)
continue
}
// check signature, ignore invalid, except from trusted (AssumeValid) relays
if !r.AssumeValid {
if ok, err = env.Event.CheckSignature(); !ok {
errmsg := ""
if chk.D(err) {
errmsg = err.Error()
}
log.D.F("{%s} bad signature on %s; %s",
r.URL(), env.Event.ID, errmsg)
continue
}
}
// dispatch this to the internal .events channel of the
// subscription
s.DispatchEvent(env.Event)
}
case *eoseenvelope.T:
log.D.Ln("eose", r.Subscriptions.Size())
if s, ok := r.Subscriptions.Load(env.Sub.String()); ok {
log.D.Ln("dispatching eose", env.Sub.String())
s.DispatchEose()
}
case *closedenvelope.T:
if s, ok := r.Subscriptions.Load(env.ID.String()); ok {
s.DispatchClosed(env.Reason)
}
case *countenvelope.Response:
if s, ok := r.Subscriptions.Load(env.ID.String()); ok &&
s.CountResult != nil {
s.CountResult <- env.Count
}
case *okenvelope.T:
if okCallback, exist := r.okCallbacks.Load(env.ID.String()); exist {
okCallback(env.OK, env.Reason)
} else {
log.D.F("{%s} got an unexpected OK message for event %s",
r.URL(), env.ID)
}
}
}
}
// Write queues a message to be sent to the relay.
func (r *T) Write(msg []byte) <-chan error {
ch := make(chan error)
select {
case r.writeQueue <- writeRequest{msg: msg, answer: ch}:
case <-r.connectionContext.Done():
go func() { ch <- fmt.Errorf("connection closed") }()
}
return ch
}
// Publish sends an "EVENT" command to the relay r as in NIP-01 and waits for an
// OK response.
func (r *T) Publish(c context.T, ev *event.T) error {
return r.publish(c, ev.ID.String(), &eventenvelope.T{Event: ev})
}
// Auth sends an "AUTH" command client->relay as in NIP-42 and waits for an OK
// response.
func (r *T) Auth(c context.T, sign func(ev *event.T) error) error {
log.I.Ln("sending auth to relay", r.URL())
authEvent := &event.T{
CreatedAt: timestamp.Now(),
Kind: kind.ClientAuthentication,
Tags: tags.T{
tag.T{"relay", r.URL()},
tag.T{"challenge", r.challenge},
},
Content: "",
}
if err := sign(authEvent); chk.D(err) {
return fmt.Errorf("error signing auth event: %w", err)
}
return r.publish(c, authEvent.ID.String(),
&authenvelope.Response{Event: authEvent})
}
// publish can be used both for EVENT and for AUTH
func (r *T) publish(c context.T, id string, env enveloper.I) error {
var err error
var cancel context.F
if _, ok := c.Deadline(); !ok {
// if no timeout is set, force it to 7 seconds
c, cancel = context.Timeout(c, 7*time.Second)
defer cancel()
} else {
// otherwise make the context cancellable so we can stop everything upon
// receiving an "OK"
c, cancel = context.Cancel(c)
defer cancel()
}
// listen for an OK callback
gotOk := false
r.okCallbacks.Store(id, func(ok bool, reason string) {
gotOk = true
if !ok {
err = log.E.Err("msg: %s", reason)
}
cancel()
})
defer r.okCallbacks.Delete(id)
// publish event
var enb []byte
enb, err = env.MarshalJSON()
log.T.F("{%s} sending %v", r.URL(), string(enb))
if err = <-r.Write(enb); err != nil {
return err
}
for {
select {
case <-c.Done():
// this will be called when we get an OK or when the context has been canceled
if gotOk {
return err
}
return c.Err()
case <-r.connectionContext.Done():
// this is caused when we lose connectivity
return err
}
}
}
// Subscribe sends a "REQ" command to the relay r as in NIP-01. Events are
// returned through the channel sub.Events. The subscription is closed when
// context ctx is cancelled ("CLOSE" in NIP-01).
//
// Remember to cancel subscriptions, either by calling `.Unsub()` on them or
// ensuring their `context.T` will be canceled at some point. Failure to do that
// will result in a huge number of halted goroutines being created.
func (r *T) Subscribe(c context.T, f filters.T,
opts ...subscriptionoption.I) (*subscription.T, error) {
sub := r.PrepareSubscription(c, f, opts...)
if err := sub.Fire(); err != nil {
return nil, fmt.Errorf("couldn't subscribe to %v at %s: %w", f, r.URL(),
err)
}
return sub, nil
}
// PrepareSubscription creates a subscription, but doesn't fire it.
//
// Remember to cancel subscriptions, either by calling `.Unsub()` on them or
// ensuring their `context.T` will be canceled at some point. Failure to do that
// will result in a huge number of halted goroutines being created.
func (r *T) PrepareSubscription(c context.T, f filters.T,
opts ...subscriptionoption.I) *subscription.T {
if r.Connection == nil {
panic(fmt.Errorf("must call .Connect() first before calling .Subscribe()"))
}
current := subscriptionIDCounter.Add(1)
ctx, cancel := context.Cancel(c)
sub := &subscription.T{
Relay: r,
Context: ctx,
Cancel: cancel,
Counter: int(current),
Events: make(event.C),
EndOfStoredEvents: make(chan struct{}),
ClosedReason: make(chan string, 1),
Filters: f,
}
for _, opt := range opts {
switch o := opt.(type) {
case subscription.WithLabel:
sub.Label = string(o)
}
}
id := sub.GetID()
r.Subscriptions.Store(id, sub)
// start handling events, eose, unsub etc:
go sub.Start()
return sub
}
func (r *T) QuerySync(c context.T, f *filter.T,
opts ...subscriptionoption.I) ([]*event.T, error) {
log.D.Ln(f.ToObject().String())
sub, err := r.Subscribe(c, filters.T{f}, opts...)
if err != nil {
return nil, err
}
defer sub.Unsub()
if _, ok := c.Deadline(); !ok {
// if no timeout is set, force it to 7 seconds
var cancel context.F
c, cancel = context.Timeout(c, 7*time.Second)
defer cancel()
}
var events []*event.T
for {
select {
case evt := <-sub.Events:
if evt == nil {
log.I.Ln("channel is closed")
return events, nil
}
events = append(events, evt)
case <-sub.EndOfStoredEvents:
log.I.Ln("EOSE")
return events, nil
case <-c.Done():
log.I.Ln("sub context done")
return events, nil
}
}
}
func (r *T) Count(c context.T, filters filters.T,
opts ...subscriptionoption.I) (int, error) {
sub := r.PrepareSubscription(c, filters, opts...)
sub.CountResult = make(chan int)
if err := sub.Fire(); err != nil {
return 0, err
}
defer sub.Unsub()
if _, ok := c.Deadline(); !ok {
// if no timeout is set, force it to 7 seconds
var cancel context.F
c, cancel = context.Timeout(c, 7*time.Second)
defer cancel()
}
for {
select {
case count := <-sub.CountResult:
return count, nil
case <-c.Done():
return 0, c.Err()
}
}
}
func (r *T) Close() error {
r.closeMutex.Lock()
defer r.closeMutex.Unlock()
if r.connectionContextCancel == nil {
return fmt.Errorf("relay not connected")
}
r.connectionContextCancel()
r.connectionContextCancel = nil
return r.Connection.Close()
}