PubSub (Издатель-подписчик)
Паттерн проектирования PubSub
Описание PubSub
Издатель-подписчик (англ. PubSub) —поведенческий шаблон проектирования передачи сообщений, в котором отправители сообщений, именуемые издателями (англ. publishers), напрямую не привязаны программным кодом отправки сообщений к подписчикам (англ. subscribers). Вместо этого сообщения делятся на классы и не содержат сведений о своих подписчиках, если таковые есть. Аналогичным образом подписчики имеют дело с одним или несколькими классами сообщений, абстрагируясь от конкретных издателей..
Шаблон издатель-подписчик представляет собой расширение шаблона наблюдатель, в который добавлено описание канала событий (англ. event channel), специально предназначенного для оповещения о событиях.
Структура.
В модели издатель-подписчик подписчики обычно получают только подмножество всех опубликованных сообщений. Процесс отбора сообщений для получения и их обработка называется фильтрацией. Существуют две основных формы фильтрации: основанная на теме (англ. topic) и основанная на содержимом.
В системе, основанной на теме, сообщения публикуются в «темах» или именованных логических каналах. Подписчики в таких системах будут получать все сообщения, опубликованные в темах, на которые они подписались, и все подписчики, подписавшиеся на одну и ту же тему, будут получать те же самые сообщения. Издатель отвечает за определение классов сообщений, на которые подписываются подписчики.
В системе, основанной на содержимом, сообщения доставляются подписчикам только в том случае, если атрибуты или содержимое этих сообщений допускаются подписчиком. В данной системе подписчик отвечает за классификацию сообщений.
Некоторые системы представляют собой гибрид между этими двумя системами: издатель отправляет сообщения в тему, в то время как подписчики регистрируют подписку, основанную на содержимом для одной или более тем.
Паттерн описан Андреем Болониным.
Примеры реализации
// PubSub Pattern in JavaScript
class PubSub {
constructor() {
this.subscribers = new Map();
}
subscribe(topic, callback) {
if (!this.subscribers.has(topic)) {
this.subscribers.set(topic, []);
}
this.subscribers.get(topic).push(callback);
console.log(`Subscribed to topic: ${topic}`);
}
unsubscribe(topic, callback) {
if (this.subscribers.has(topic)) {
const callbacks = this.subscribers.get(topic);
const index = callbacks.indexOf(callback);
if (index > -1) {
callbacks.splice(index, 1);
console.log(`Unsubscribed from topic: ${topic}`);
}
}
}
publish(topic, data) {
if (this.subscribers.has(topic)) {
const callbacks = this.subscribers.get(topic);
console.log(`Publishing to topic: ${topic}`, data);
callbacks.forEach(callback => {
try {
callback(data);
} catch (error) {
console.error(`Error in subscriber for topic ${topic}:`, error);
}
});
} else {
console.log(`No subscribers for topic: ${topic}`);
}
}
getSubscriberCount(topic) {
return this.subscribers.has(topic) ? this.subscribers.get(topic).length : 0;
}
}
// Usage
const pubsub = new PubSub();
// Subscribers
const userHandler = (data) => {
console.log('User handler received:', data);
};
const emailHandler = (data) => {
console.log('Email handler received:', data);
};
const logHandler = (data) => {
console.log('Log handler received:', data);
};
// Subscribe to topics
pubsub.subscribe('user.created', userHandler);
pubsub.subscribe('user.created', emailHandler);
pubsub.subscribe('user.created', logHandler);
pubsub.subscribe('user.updated', userHandler);
pubsub.subscribe('user.updated', logHandler);
// Publish events
pubsub.publish('user.created', { id: 1, name: 'John Doe', email: 'john@example.com' });
pubsub.publish('user.updated', { id: 1, name: 'John Smith', email: 'johnsmith@example.com' });
console.log('Subscribers for user.created:', pubsub.getSubscriberCount('user.created'));
<?php
// PubSub Pattern in PHP
class PubSub {
private $subscribers = [];
public function subscribe($topic, $callback) {
if (!isset($this->subscribers[$topic])) {
$this->subscribers[$topic] = [];
}
$this->subscribers[$topic][] = $callback;
echo "Subscribed to topic: $topic\n";
}
public function unsubscribe($topic, $callback) {
if (isset($this->subscribers[$topic])) {
$index = array_search($callback, $this->subscribers[$topic], true);
if ($index !== false) {
unset($this->subscribers[$topic][$index]);
$this->subscribers[$topic] = array_values($this->subscribers[$topic]);
echo "Unsubscribed from topic: $topic\n";
}
}
}
public function publish($topic, $data) {
if (isset($this->subscribers[$topic])) {
echo "Publishing to topic: $topic\n";
foreach ($this->subscribers[$topic] as $callback) {
try {
$callback($data);
} catch (Exception $e) {
echo "Error in subscriber for topic $topic: " . $e->getMessage() . "\n";
}
}
} else {
echo "No subscribers for topic: $topic\n";
}
}
public function getSubscriberCount($topic) {
return isset($this->subscribers[$topic]) ? count($this->subscribers[$topic]) : 0;
}
}
// Usage
$pubsub = new PubSub();
// Subscribers
$userHandler = function($data) {
echo "User handler received: " . json_encode($data) . "\n";
};
$emailHandler = function($data) {
echo "Email handler received: " . json_encode($data) . "\n";
};
$logHandler = function($data) {
echo "Log handler received: " . json_encode($data) . "\n";
};
// Subscribe to topics
$pubsub->subscribe('user.created', $userHandler);
$pubsub->subscribe('user.created', $emailHandler);
$pubsub->subscribe('user.created', $logHandler);
$pubsub->subscribe('user.updated', $userHandler);
$pubsub->subscribe('user.updated', $logHandler);
// Publish events
$pubsub->publish('user.created', ['id' => 1, 'name' => 'John Doe', 'email' => 'john@example.com']);
$pubsub->publish('user.updated', ['id' => 1, 'name' => 'John Smith', 'email' => 'johnsmith@example.com']);
echo "Subscribers for user.created: " . $pubsub->getSubscriberCount('user.created') . "\n";
?>
// PubSub Pattern in Go
package main
import (
"fmt"
"sync"
)
type Message struct {
Topic string
Data interface{}
}
type Subscriber func(interface{})
type PubSub struct {
subscribers map[string][]Subscriber
mutex sync.RWMutex
}
func NewPubSub() *PubSub {
return &PubSub{
subscribers: make(map[string][]Subscriber),
}
}
func (ps *PubSub) Subscribe(topic string, callback Subscriber) {
ps.mutex.Lock()
defer ps.mutex.Unlock()
ps.subscribers[topic] = append(ps.subscribers[topic], callback)
fmt.Printf("Subscribed to topic: %s\n", topic)
}
func (ps *PubSub) Unsubscribe(topic string, callback Subscriber) {
ps.mutex.Lock()
defer ps.mutex.Unlock()
if subscribers, exists := ps.subscribers[topic]; exists {
for i, sub := range subscribers {
if &sub == &callback {
ps.subscribers[topic] = append(subscribers[:i], subscribers[i+1:]...)
fmt.Printf("Unsubscribed from topic: %s\n", topic)
break
}
}
}
}
func (ps *PubSub) Publish(topic string, data interface{}) {
ps.mutex.RLock()
defer ps.mutex.RUnlock()
if subscribers, exists := ps.subscribers[topic]; exists {
fmt.Printf("Publishing to topic: %s\n", topic)
for _, callback := range subscribers {
go func(cb Subscriber) {
defer func() {
if r := recover(); r != nil {
fmt.Printf("Error in subscriber for topic %s: %v\n", topic, r)
}
}()
cb(data)
}(callback)
}
} else {
fmt.Printf("No subscribers for topic: %s\n", topic)
}
}
func (ps *PubSub) GetSubscriberCount(topic string) int {
ps.mutex.RLock()
defer ps.mutex.RUnlock()
if subscribers, exists := ps.subscribers[topic]; exists {
return len(subscribers)
}
return 0
}
// Usage
func main() {
pubsub := NewPubSub()
// Subscribers
userHandler := func(data interface{}) {
fmt.Printf("User handler received: %+v\n", data)
}
emailHandler := func(data interface{}) {
fmt.Printf("Email handler received: %+v\n", data)
}
logHandler := func(data interface{}) {
fmt.Printf("Log handler received: %+v\n", data)
}
// Subscribe to topics
pubsub.Subscribe("user.created", userHandler)
pubsub.Subscribe("user.created", emailHandler)
pubsub.Subscribe("user.created", logHandler)
pubsub.Subscribe("user.updated", userHandler)
pubsub.Subscribe("user.updated", logHandler)
// Publish events
pubsub.Publish("user.created", map[string]interface{}{
"id": 1, "name": "John Doe", "email": "john@example.com",
})
pubsub.Publish("user.updated", map[string]interface{}{
"id": 1, "name": "John Smith", "email": "johnsmith@example.com",
})
fmt.Printf("Subscribers for user.created: %d\n", pubsub.GetSubscriberCount("user.created"))
}