FanIn (merging topics)

FanIn component

The FanIn component merges two topics into one.

Configuring

// ...
type Config struct {
	// SourceTopics contains topics on which FanIn subscribes.
	SourceTopics []string

	// TargetTopic determines the topic on which messages from SourceTopics are published.
	TargetTopic string

	// CloseTimeout determines how long router should work for handlers when closing.
	CloseTimeout time.Duration
}
// ...

Full source: github.com/ThreeDotsLabs/watermill/components/fanin/fanin.go

Running

You need to provide a Publisher and a Subscriber implementation for the FanIn component.

You can find the list of supported Pub/Subs on Supported Pub/Subs page . The Publisher and subscriber can be implemented by different message brokers (for example, you can merge a Kafka topic with a RabbitMQ topic).


logger := watermill.NewStdLogger(false, false)

// create Publisher and Subscriber
pub, err := // ...
sub, err := // ...

fi, err := fanin.NewFanIn(
    sub,
    pub,
    fanin.Config{
        SourceTopics: upstreamTopics,
        TargetTopic:  downstreamTopic,
    },
    logger,
)
if err != nil {
    panic(err)
}

if err := fi.Run(context.Background()); err != nil {
    panic(err)
}

Controlling FanIn component

The FanIn component can be stopped by cancelling the context passed to the Run method or by calling the Close method.

// ...
func (f *FanIn) Run(ctx context.Context) error {
	return f.router.Run(ctx)
}

// Running is closed when FanIn is running.
func (f *FanIn) Running() chan struct{} {
	return f.router.Running()
}

// Close gracefully closes the FanIn
func (f *FanIn) Close() error {
	return f.router.Close()
}
// ...

Full source: github.com/ThreeDotsLabs/watermill/components/fanin/fanin.go


Check our online hands-on training