Merging two topics into one

Merging two topics into one with the FanIn component.

FanIn component

The FanIn component merges two topics into one.


Full source:

// ...
type Config struct {
	// SourceTopics contains topics on which FanIn subscribes.
	SourceTopics []string

	// TargetTopic determines the topic on which messages from SourceTopics are published.
	TargetTopic string

	// CloseTimeout determines how long router should work for handlers when closing.
	CloseTimeout time.Duration
// ...


You need to provide a Publisher and a Subscriber implementation for the FanIn component.

You can find the list of supported Pub/Subs on Supported Pub/Subs page. The Publisher and subscriber can be implemented by different message brokers (for example, you can merge a Kafka topic with a RabbitMQ topic).

logger := watermill.NewStdLogger(false, false)

// create Publisher and Subscriber
pub, err := // ...
sub, err := // ...

fi, err := fanin.NewFanIn(
        SourceTopics: upstreamTopics,
        TargetTopic:  downstreamTopic,
if err != nil {

if err := fi.Run(context.Background()); err != nil {

Controlling FanIn component

The FanIn component can be stopped by cancelling the context passed to the Run method or by calling the Close method.

Full source:

// ...
func (f *FanIn) Run(ctx context.Context) error {
	return f.router.Run(ctx)

// Running is closed when FanIn is running.
func (f *FanIn) Running() chan struct{} {
	return f.router.Running()

// Close gracefully closes the FanIn
func (f *FanIn) Close() error {
	return f.router.Close()
// ...