Title here
Summary here
The FanIn component merges two topics into one.
// ...
type Config struct {
// SourceTopics contains topics on which FanIn subscribes.
SourceTopics []string
// TargetTopic determines the topic on which messages from SourceTopics are published.
TargetTopic string
// CloseTimeout determines how long router should work for handlers when closing.
CloseTimeout time.Duration
}
// ...
Full source: github.com/ThreeDotsLabs/watermill/components/fanin/fanin.go
You need to provide a Publisher and a Subscriber implementation for the FanIn component.
You can find the list of supported Pub/Subs on Supported Pub/Subs page . The Publisher and subscriber can be implemented by different message brokers (for example, you can merge a Kafka topic with a RabbitMQ topic).
logger := watermill.NewStdLogger(false, false)
// create Publisher and Subscriber
pub, err := // ...
sub, err := // ...
fi, err := fanin.NewFanIn(
sub,
pub,
fanin.Config{
SourceTopics: upstreamTopics,
TargetTopic: downstreamTopic,
},
logger,
)
if err != nil {
panic(err)
}
if err := fi.Run(context.Background()); err != nil {
panic(err)
}
The FanIn component can be stopped by cancelling the context passed to the Run
method or by calling the Close
method.
// ...
func (f *FanIn) Run(ctx context.Context) error {
return f.router.Run(ctx)
}
// Running is closed when FanIn is running.
func (f *FanIn) Running() chan struct{} {
return f.router.Running()
}
// Close gracefully closes the FanIn
func (f *FanIn) Close() error {
return f.router.Close()
}
// ...
Full source: github.com/ThreeDotsLabs/watermill/components/fanin/fanin.go