HTTP

The HTTP subscriber listens to HTTP requests (for example - webhooks) and outputs them as messages. You can then post them to any Publisher. Here is an example with sending HTTP messages to Kafka .

The HTTP publisher sends HTTP requests as specified in its configuration. Here is an example with transforming Kafka messages into HTTP webhook requests .

Installation

go get github.com/ThreeDotsLabs/watermill-http/v2

Characteristics

FeatureImplementsNote
ConsumerGroupsno
ExactlyOnceDeliveryyes
GuaranteedOrderyes
Persistentno

Subscriber configuration

Subscriber configuration is done via the config struct passed to the constructor:

// ...
type SubscriberConfig struct {
	Router               chi.Router
	UnmarshalMessageFunc UnmarshalMessageFunc
}
// ...

Full source: github.com/ThreeDotsLabs/watermill-http/pkg/http/subscriber.go

You can use the Router config option to SubscriberConfig to pass your own chi.Router (see chi ). This may be helpful if you’d like to add your own HTTP handlers (e.g. a health check endpoint).

Publisher configuration

Publisher configuration is done via the config struct passed to the constructor:

// ...
type PublisherConfig struct {
	MarshalMessageFunc MarshalMessageFunc
	Client             *http.Client
	// if false (default), when server responds with error (>=400) to the webhook request, the response body is logged.
	DoNotLogResponseBodyOnServerError bool
}
// ...

Full source: github.com/ThreeDotsLabs/watermill-http/pkg/http/publisher.go

How the message topic and body translate into the URL, method, headers, and payload of the HTTP request is highly configurable through the use of MarshalMessageFunc. Use the provided DefaultMarshalMessageFunc to send POST requests to a specific url:

// ...
// MarshalMessageFunc transforms the message into a HTTP request to be sent to the specified url.
type MarshalMessageFunc func(url string, msg *message.Message) (*http.Request, error)

// DefaultMarshalMessageFunc transforms the message into a HTTP POST request.
// It encodes the UUID and Metadata in request headers.
func DefaultMarshalMessageFunc(url string, msg *message.Message) (*http.Request, error) {
	req, err := http.NewRequest(http.MethodPost, url, bytes.NewBuffer(msg.Payload))
	if err != nil {
		return nil, err
	}

	req.Header.Set(HeaderUUID, msg.UUID)

	metadataJson, err := json.Marshal(msg.Metadata)
	if err != nil {
		return nil, errors.Wrap(err, "could not marshal metadata to JSON")
	}
	req.Header.Set(HeaderMetadata, string(metadataJson))
	return req, nil
}

// ...

Full source: github.com/ThreeDotsLabs/watermill-http/pkg/http/publisher.go

You can pass your own http.Client to execute the requests or use Golang’s default client.

Running

To run HTTP subscriber you need to run StartHTTPServer(). It needs to be run after Subscribe().

When using with the router, you should wait for the router to start.

<-r.Running()
httpSubscriber.StartHTTPServer()

Subscribing

// ...
// Subscribe adds HTTP handler which will listen in provided url for messages.
//
// Subscribe needs to be called before `StartHTTPServer`.
//
// When request is sent, it will wait for the `Ack`. When Ack is received 200 HTTP status wil be sent.
// When Nack is sent, 500 HTTP status will be sent.
func (s *Subscriber) Subscribe(ctx context.Context, url string) (<-chan *message.Message, error) {
// ...

Full source: github.com/ThreeDotsLabs/watermill-http/pkg/http/subscriber.go

Custom HTTP status codes

To specify a custom HTTP status code, which will returned as response, you can use following call during message handling:

// msg is a *message.Message
http.SetResponseStatusCode(msg, http.StatusForbidden)
msg.Nack()

Check our online hands-on training