HTTP
The HTTP subscriber listens to HTTP requests (for example - webhooks) and outputs them as messages. You can then post them to any Publisher. Here is an example with sending HTTP messages to Kafka .
The HTTP publisher sends HTTP requests as specified in its configuration. Here is an example with transforming Kafka messages into HTTP webhook requests .
Installation
go get github.com/ThreeDotsLabs/watermill-http/v2
Characteristics
Feature | Implements | Note |
---|---|---|
ConsumerGroups | no | |
ExactlyOnceDelivery | yes | |
GuaranteedOrder | yes | |
Persistent | no |
Subscriber configuration
Subscriber configuration is done via the config struct passed to the constructor:
// ...
type SubscriberConfig struct {
Router chi.Router
UnmarshalMessageFunc UnmarshalMessageFunc
}
// ...
Full source: github.com/ThreeDotsLabs/watermill-http/pkg/http/subscriber.go
You can use the Router
config option to SubscriberConfig
to pass your own chi.Router
(see chi
).
This may be helpful if you’d like to add your own HTTP handlers (e.g. a health check endpoint).
Publisher configuration
Publisher configuration is done via the config struct passed to the constructor:
// ...
type PublisherConfig struct {
MarshalMessageFunc MarshalMessageFunc
Client *http.Client
// if false (default), when server responds with error (>=400) to the webhook request, the response body is logged.
DoNotLogResponseBodyOnServerError bool
}
// ...
Full source: github.com/ThreeDotsLabs/watermill-http/pkg/http/publisher.go
How the message topic and body translate into the URL, method, headers, and payload of the HTTP request is highly configurable through the use of MarshalMessageFunc
.
Use the provided DefaultMarshalMessageFunc
to send POST requests to a specific url:
// ...
// MarshalMessageFunc transforms the message into a HTTP request to be sent to the specified url.
type MarshalMessageFunc func(url string, msg *message.Message) (*http.Request, error)
// DefaultMarshalMessageFunc transforms the message into a HTTP POST request.
// It encodes the UUID and Metadata in request headers.
func DefaultMarshalMessageFunc(url string, msg *message.Message) (*http.Request, error) {
req, err := http.NewRequest(http.MethodPost, url, bytes.NewBuffer(msg.Payload))
if err != nil {
return nil, err
}
req.Header.Set(HeaderUUID, msg.UUID)
metadataJson, err := json.Marshal(msg.Metadata)
if err != nil {
return nil, errors.Wrap(err, "could not marshal metadata to JSON")
}
req.Header.Set(HeaderMetadata, string(metadataJson))
return req, nil
}
// ...
Full source: github.com/ThreeDotsLabs/watermill-http/pkg/http/publisher.go
You can pass your own http.Client
to execute the requests or use Golang’s default client.
Running
To run HTTP subscriber you need to run StartHTTPServer()
. It needs to be run after Subscribe()
.
When using with the router, you should wait for the router to start.
<-r.Running()
httpSubscriber.StartHTTPServer()
Subscribing
// ...
// Subscribe adds HTTP handler which will listen in provided url for messages.
//
// Subscribe needs to be called before `StartHTTPServer`.
//
// When request is sent, it will wait for the `Ack`. When Ack is received 200 HTTP status wil be sent.
// When Nack is sent, 500 HTTP status will be sent.
func (s *Subscriber) Subscribe(ctx context.Context, url string) (<-chan *message.Message, error) {
// ...
Full source: github.com/ThreeDotsLabs/watermill-http/pkg/http/subscriber.go
Custom HTTP status codes
To specify a custom HTTP status code, which will returned as response, you can use following call during message handling:
// msg is a *message.Message
http.SetResponseStatusCode(msg, http.StatusForbidden)
msg.Nack()