PubSub (Издатель-подписчик)

Паттерн проектирования Active Record

Паттерн проектирования PubSub

Описание PubSub

Издатель-подписчик (англ. PubSub) —поведенческий шаблон проектирования передачи сообщений, в котором отправители сообщений, именуемые издателями (англ. publishers), напрямую не привязаны программным кодом отправки сообщений к подписчикам (англ. subscribers). Вместо этого сообщения делятся на классы и не содержат сведений о своих подписчиках, если таковые есть. Аналогичным образом подписчики имеют дело с одним или несколькими классами сообщений, абстрагируясь от конкретных издателей..

Шаблон издатель-подписчик представляет собой расширение шаблона наблюдатель, в который добавлено описание канала событий (англ. event channel), специально предназначенного для оповещения о событиях.

Структура.

В модели издатель-подписчик подписчики обычно получают только подмножество всех опубликованных сообщений. Процесс отбора сообщений для получения и их обработка называется фильтрацией. Существуют две основных формы фильтрации: основанная на теме (англ. topic) и основанная на содержимом.

В системе, основанной на теме, сообщения публикуются в «темах» или именованных логических каналах. Подписчики в таких системах будут получать все сообщения, опубликованные в темах, на которые они подписались, и все подписчики, подписавшиеся на одну и ту же тему, будут получать те же самые сообщения. Издатель отвечает за определение классов сообщений, на которые подписываются подписчики.

В системе, основанной на содержимом, сообщения доставляются подписчикам только в том случае, если атрибуты или содержимое этих сообщений допускаются подписчиком. В данной системе подписчик отвечает за классификацию сообщений.

Некоторые системы представляют собой гибрид между этими двумя системами: издатель отправляет сообщения в тему, в то время как подписчики регистрируют подписку, основанную на содержимом для одной или более тем.

Паттерн описан Андреем Болониным.

Примеры реализации

// PubSub Pattern in JavaScript
class PubSub {
    constructor() {
        this.subscribers = new Map();
    }
    
    subscribe(topic, callback) {
        if (!this.subscribers.has(topic)) {
            this.subscribers.set(topic, []);
        }
        this.subscribers.get(topic).push(callback);
        console.log(`Subscribed to topic: ${topic}`);
    }
    
    unsubscribe(topic, callback) {
        if (this.subscribers.has(topic)) {
            const callbacks = this.subscribers.get(topic);
            const index = callbacks.indexOf(callback);
            if (index > -1) {
                callbacks.splice(index, 1);
                console.log(`Unsubscribed from topic: ${topic}`);
            }
        }
    }
    
    publish(topic, data) {
        if (this.subscribers.has(topic)) {
            const callbacks = this.subscribers.get(topic);
            console.log(`Publishing to topic: ${topic}`, data);
            callbacks.forEach(callback => {
                try {
                    callback(data);
                } catch (error) {
                    console.error(`Error in subscriber for topic ${topic}:`, error);
                }
            });
        } else {
            console.log(`No subscribers for topic: ${topic}`);
        }
    }
    
    getSubscriberCount(topic) {
        return this.subscribers.has(topic) ? this.subscribers.get(topic).length : 0;
    }
}

// Usage
const pubsub = new PubSub();

// Subscribers
const userHandler = (data) => {
    console.log('User handler received:', data);
};

const emailHandler = (data) => {
    console.log('Email handler received:', data);
};

const logHandler = (data) => {
    console.log('Log handler received:', data);
};

// Subscribe to topics
pubsub.subscribe('user.created', userHandler);
pubsub.subscribe('user.created', emailHandler);
pubsub.subscribe('user.created', logHandler);

pubsub.subscribe('user.updated', userHandler);
pubsub.subscribe('user.updated', logHandler);

// Publish events
pubsub.publish('user.created', { id: 1, name: 'John Doe', email: 'john@example.com' });
pubsub.publish('user.updated', { id: 1, name: 'John Smith', email: 'johnsmith@example.com' });

console.log('Subscribers for user.created:', pubsub.getSubscriberCount('user.created'));
<?php
// PubSub Pattern in PHP
class PubSub {
    private $subscribers = [];
    
    public function subscribe($topic, $callback) {
        if (!isset($this->subscribers[$topic])) {
            $this->subscribers[$topic] = [];
        }
        $this->subscribers[$topic][] = $callback;
        echo "Subscribed to topic: $topic\n";
    }
    
    public function unsubscribe($topic, $callback) {
        if (isset($this->subscribers[$topic])) {
            $index = array_search($callback, $this->subscribers[$topic], true);
            if ($index !== false) {
                unset($this->subscribers[$topic][$index]);
                $this->subscribers[$topic] = array_values($this->subscribers[$topic]);
                echo "Unsubscribed from topic: $topic\n";
            }
        }
    }
    
    public function publish($topic, $data) {
        if (isset($this->subscribers[$topic])) {
            echo "Publishing to topic: $topic\n";
            foreach ($this->subscribers[$topic] as $callback) {
                try {
                    $callback($data);
                } catch (Exception $e) {
                    echo "Error in subscriber for topic $topic: " . $e->getMessage() . "\n";
                }
            }
        } else {
            echo "No subscribers for topic: $topic\n";
        }
    }
    
    public function getSubscriberCount($topic) {
        return isset($this->subscribers[$topic]) ? count($this->subscribers[$topic]) : 0;
    }
}

// Usage
$pubsub = new PubSub();

// Subscribers
$userHandler = function($data) {
    echo "User handler received: " . json_encode($data) . "\n";
};

$emailHandler = function($data) {
    echo "Email handler received: " . json_encode($data) . "\n";
};

$logHandler = function($data) {
    echo "Log handler received: " . json_encode($data) . "\n";
};

// Subscribe to topics
$pubsub->subscribe('user.created', $userHandler);
$pubsub->subscribe('user.created', $emailHandler);
$pubsub->subscribe('user.created', $logHandler);

$pubsub->subscribe('user.updated', $userHandler);
$pubsub->subscribe('user.updated', $logHandler);

// Publish events
$pubsub->publish('user.created', ['id' => 1, 'name' => 'John Doe', 'email' => 'john@example.com']);
$pubsub->publish('user.updated', ['id' => 1, 'name' => 'John Smith', 'email' => 'johnsmith@example.com']);

echo "Subscribers for user.created: " . $pubsub->getSubscriberCount('user.created') . "\n";
?>
// PubSub Pattern in Go
package main

import (
    "fmt"
    "sync"
)

type Message struct {
    Topic string
    Data  interface{}
}

type Subscriber func(interface{})

type PubSub struct {
    subscribers map[string][]Subscriber
    mutex       sync.RWMutex
}

func NewPubSub() *PubSub {
    return &PubSub{
        subscribers: make(map[string][]Subscriber),
    }
}

func (ps *PubSub) Subscribe(topic string, callback Subscriber) {
    ps.mutex.Lock()
    defer ps.mutex.Unlock()
    
    ps.subscribers[topic] = append(ps.subscribers[topic], callback)
    fmt.Printf("Subscribed to topic: %s\n", topic)
}

func (ps *PubSub) Unsubscribe(topic string, callback Subscriber) {
    ps.mutex.Lock()
    defer ps.mutex.Unlock()
    
    if subscribers, exists := ps.subscribers[topic]; exists {
        for i, sub := range subscribers {
            if &sub == &callback {
                ps.subscribers[topic] = append(subscribers[:i], subscribers[i+1:]...)
                fmt.Printf("Unsubscribed from topic: %s\n", topic)
                break
            }
        }
    }
}

func (ps *PubSub) Publish(topic string, data interface{}) {
    ps.mutex.RLock()
    defer ps.mutex.RUnlock()
    
    if subscribers, exists := ps.subscribers[topic]; exists {
        fmt.Printf("Publishing to topic: %s\n", topic)
        for _, callback := range subscribers {
            go func(cb Subscriber) {
                defer func() {
                    if r := recover(); r != nil {
                        fmt.Printf("Error in subscriber for topic %s: %v\n", topic, r)
                    }
                }()
                cb(data)
            }(callback)
        }
    } else {
        fmt.Printf("No subscribers for topic: %s\n", topic)
    }
}

func (ps *PubSub) GetSubscriberCount(topic string) int {
    ps.mutex.RLock()
    defer ps.mutex.RUnlock()
    
    if subscribers, exists := ps.subscribers[topic]; exists {
        return len(subscribers)
    }
    return 0
}

// Usage
func main() {
    pubsub := NewPubSub()
    
    // Subscribers
    userHandler := func(data interface{}) {
        fmt.Printf("User handler received: %+v\n", data)
    }
    
    emailHandler := func(data interface{}) {
        fmt.Printf("Email handler received: %+v\n", data)
    }
    
    logHandler := func(data interface{}) {
        fmt.Printf("Log handler received: %+v\n", data)
    }
    
    // Subscribe to topics
    pubsub.Subscribe("user.created", userHandler)
    pubsub.Subscribe("user.created", emailHandler)
    pubsub.Subscribe("user.created", logHandler)
    
    pubsub.Subscribe("user.updated", userHandler)
    pubsub.Subscribe("user.updated", logHandler)
    
    // Publish events
    pubsub.Publish("user.created", map[string]interface{}{
        "id": 1, "name": "John Doe", "email": "john@example.com",
    })
    pubsub.Publish("user.updated", map[string]interface{}{
        "id": 1, "name": "John Smith", "email": "johnsmith@example.com",
    })
    
    fmt.Printf("Subscribers for user.created: %d\n", pubsub.GetSubscriberCount("user.created"))
}