FanIn (merging topics) On this page FanIn component# The FanIn component merges two topics into one.
Configuring# // ...
type Config struct {
// SourceTopics contains topics on which FanIn subscribes.
SourceTopics [] string
// TargetTopic determines the topic on which messages from SourceTopics are published.
TargetTopic string
// CloseTimeout determines how long router should work for handlers when closing.
CloseTimeout time . Duration
}
// ...
Full source: github.com/ThreeDotsLabs/watermill/components/fanin/fanin.go
Running# You need to provide a Publisher and a Subscriber implementation for the FanIn component.
You can find the list of supported Pub/Subs on Supported Pub/Subs page
.
The Publisher and subscriber can be implemented by different message brokers (for example, you can merge a Kafka topic with a RabbitMQ topic).
logger := watermill . NewStdLogger ( false , false )
// create Publisher and Subscriber
pub , err := // ...
sub , err := // ...
fi , err := fanin . NewFanIn (
sub ,
pub ,
fanin . Config {
SourceTopics : upstreamTopics ,
TargetTopic : downstreamTopic ,
},
logger ,
)
if err != nil {
panic ( err )
}
if err := fi . Run ( context . Background ()); err != nil {
panic ( err )
}
Controlling FanIn component# The FanIn component can be stopped by cancelling the context passed to the Run
method or by calling the Close
method.
// ...
func ( f * FanIn ) Run ( ctx context . Context ) error {
return f . router . Run ( ctx )
}
// Running is closed when FanIn is running.
func ( f * FanIn ) Running () chan struct {} {
return f . router . Running ()
}
// Close gracefully closes the FanIn
func ( f * FanIn ) Close () error {
return f . router . Close ()
}
// ...
Full source: github.com/ThreeDotsLabs/watermill/components/fanin/fanin.go