HTTP
The HTTP subscriber listens to HTTP requests (for example - webhooks) and outputs them as messages. You can then post them to any Publisher. Here is an example with sending HTTP messages to Kafka.
The HTTP publisher sends HTTP requests as specified in its configuration. Here is an example with transforming Kafka messages into HTTP webhook requests.
Installation
go get github.com/ThreeDotsLabs/watermill-http
Characteristics
Feature | Implements | Note |
---|---|---|
ConsumerGroups | no | |
ExactlyOnceDelivery | yes | |
GuaranteedOrder | yes | |
Persistent | no |
Subscriber configuration
Subscriber configuration is done via the config struct passed to the constructor:
Full source: github.com/ThreeDotsLabs/watermill-http/pkg/http/subscriber.go
// ...
type SubscriberConfig struct {
Router chi.Router
UnmarshalMessageFunc UnmarshalMessageFunc
}
// ...
You can use the Router
config option to SubscriberConfig
to pass your own chi.Router
(see chi).
This may be helpful if you’d like to add your own HTTP handlers (e.g. a health check endpoint).
Publisher configuration
Publisher configuration is done via the config struct passed to the constructor:
Full source: github.com/ThreeDotsLabs/watermill-http/pkg/http/publisher.go
// ...
type PublisherConfig struct {
MarshalMessageFunc MarshalMessageFunc
Client *http.Client
// if false (default), when server responds with error (>=400) to the webhook request, the response body is logged.
DoNotLogResponseBodyOnServerError bool
}
// ...
How the message topic and body translate into the URL, method, headers, and payload of the HTTP request is highly configurable through the use of MarshalMessageFunc
.
Use the provided DefaultMarshalMessageFunc
to send POST requests to a specific url:
Full source: github.com/ThreeDotsLabs/watermill-http/pkg/http/publisher.go
// ...
// MarshalMessageFunc transforms the message into a HTTP request to be sent to the specified url.
type MarshalMessageFunc func(url string, msg *message.Message) (*http.Request, error)
// DefaultMarshalMessageFunc transforms the message into a HTTP POST request.
// It encodes the UUID and Metadata in request headers.
func DefaultMarshalMessageFunc(url string, msg *message.Message) (*http.Request, error) {
req, err := http.NewRequest(http.MethodPost, url, bytes.NewBuffer(msg.Payload))
if err != nil {
return nil, err
}
req.Header.Set(HeaderUUID, msg.UUID)
metadataJson, err := json.Marshal(msg.Metadata)
if err != nil {
return nil, errors.Wrap(err, "could not marshal metadata to JSON")
}
req.Header.Set(HeaderMetadata, string(metadataJson))
return req, nil
}
// ...
You can pass your own http.Client
to execute the requests or use Golang’s default client.
Running
To run HTTP subscriber you need to run StartHTTPServer()
. It needs to be run after Subscribe()
.
When using with the router, you should wait for the router to start.
<-r.Running()
httpSubscriber.StartHTTPServer()
Subscribing
Full source: github.com/ThreeDotsLabs/watermill-http/pkg/http/subscriber.go
// ...
// Subscribe adds HTTP handler which will listen in provided url for messages.
//
// Subscribe needs to be called before `StartHTTPServer`.
//
// When request is sent, it will wait for the `Ack`. When Ack is received 200 HTTP status wil be sent.
// When Nack is sent, 500 HTTP status will be sent.
func (s *Subscriber) Subscribe(ctx context.Context, url string) (<-chan *message.Message, error) {
// ...
Custom HTTP status codes
To specify a custom HTTP status code, which will returned as response, you can use following call during message handling:
// msg is a *message.Message
http.SetResponseStatusCode(msg, http.StatusForbidden)
msg.Nack()