Skip to content

DDD Redux Toolkit

Ivgeni edited this page Sep 8, 2023 · 15 revisions

Domain Events

const messageAddedToConversation = createDomainEvent('conversation/message/added');

Domain Errors

// dispatching any one of these will cause an exception to be thrown
const userIsNotParticipantInConversation = createDomainError('conversation/message/add/user_not_participant');
const userDoesNotExist = createDomainError('user/does_not_exist');
const conversationDoesNotExist = createDomainError('conversation/does_not_exist');

Values

const messageContent = createValue({
    name: 'message/content',
    value: {
        text: 'string'
    }
});

Policies

const participant = createPolicy(
    'conversation/message/add/participant',
    (sender, conversation, policyAPI) => {
        const participant = conversation.participants.selectById(sender.id);

        if (participant === undefined) {
            policyAPI.dispatch(userIsNotParticipantInConversation(sender, conversation));
        }
        // No errors were dispatched, we're clear.
    }
);

Entities

import { getAllUserIds, entityAdapter as conversationAdapter } from '../slices/conversation'; // getAllUserIds is a selector
import { messageAdapter } from '../slices/message';

const message = createEntity({
    name: 'message',
    plural: 'messages',
    selectors: messageAdapter.getSelectors(state => state.messages),
    mutators: messageAdapter,
    belongsTo: {
        conversation,
        sender: user
    },
    state: {
        messageContent
    }
});

const conversation = createAggregate({
    root: createEntity({
        name: 'conversation',
        plural: 'conversations',
        actions: {
            addMessage: async (sender, messageContent, entityAPI) => {
                const id = generateId(sender, message);
                const payload = {
                    id,
                    conversationId: entityAPI.entity().id,
                    senderId: sender.id,
                    messageContent
                };

                // lets the underlying Redux Toolkit's EntityAdapter handle persistence to the store
                await entityAPI.dispatch(entityAPI.messages.addOne(payload));

                // let the rest of the system know we have a new message in a conversation by a sender
                await entityAPI.dispatch(messageAddedToConversation(id, entityAPI.entity().id, sender.id));
            }
        },
        selectors: {
            ...conversationAdapter.getSelectors(state => state.conversations),
            getAllUserIds
        },
        mutators: conversationAdapter,
        hasMany: {
            message,
            participants: user
        }
    },
    entities: [ message ]
});

Domain Services

const addMessageToConversation = createService(
  'conversation/message/add',
    async (senderId: number, conversationId: number, messageContent: MessageContent, serviceAPI) => {
        const conversation = serviceAPI.conversations.selectById(conversationId);
        const sender = serviceAPI.users.selectById(senderId);

        if (conversation !== undefined && sender !== undefined) {
            await serviceAPI.dispatch(participant(sender, conversation));

            // let the entity handle the dirty work
            await serviceAPI.dispatch(conversation.addMessage(sender, messageContent));
        }
        if (conversation === undefined) {
            await serviceAPI.dispatch(conversationDoesNotExist(conversationId));
        }
        if (sender === undefined) {
            await serviceAPI.dispatch(userDoesNotExist(senderId));
        }
    }
);

Application Services

// Application Service for transactionally adding a message to a conversation and notifying users
// If an exception is thrown, the transaction is rolled back and the error is dispatched
const addMessageAndNotifyUsers = createApplicationService(
    'conversation/message/add/notify',
    // All domain events are queued instead of being dispatched when this function is running
    async (senderId: number, conversationId: number, messageContent: MessageContent, serviceAPI) => {
        // Use the domain service to add a message
        await serviceAPI.dispatch(addMessageToConversation(senderId, conversationId, messageContent));

        const conversation = serviceAPI.conversations.selectById(conversationId);

        if (conversation !== undefined) {
            // Fetch the IDs of all users involved in this conversation
            const allUserIds = conversation.getAllUserIds();

            // Uses the domain service to notify users
            await serviceAPI.dispatch(notifyUsers(allUserIds, `New message in conversation ${conversationId}`));
        }
        if (conversation === undefined) {
            await serviceAPI.dispatch(conversationDoesNotExist(conversationId));
        }
        // If we made it to this point, the queued up domain events will be dispatched as a batch here.
    },
    (error) => {
        // custom rollback logic
    }
);
  

/* the above is same as:
const addMessageAndNotifyUsers = createService(
    'conversation/message/add/notify',
    async (senderId: number, conversationId: number, messageContent: MessageContent, serviceAPI) => {
    
        // from this point on, domain events are recorded but not dispatched
        // until the transaction is committed.
        const transaction = serviceAPI.startTransaction(error => {
            // custom rollback logic
        );

        try {
            // Use the domain service to add a message
            await serviceAPI.dispatch(addMessageToConversation(senderId, conversationId, messageContent));

            const conversation = serviceAPI.conversations.selectById(conversationId);

            if (conversation !== undefined) {
                // Fetch the IDs of all users involved in this conversation
                const allUserIds = conversation.getAllUserIds();

                // Uses the domain service to notify users
                await serviceAPI.dispatch(notifyUsers(allUserIds, `New message in conversation ${conversationId}`));
            }
            if (conversation === undefined) {
                await serviceAPI.dispatch(conversationDoesNotExist(conversationId));
            }

            // Commit the transaction. All domain events are dispatched here
            transaction.commit();
        } catch (error) {
            // Rollback and report the error
            transaction.rollback(error);
            serviceAPI.dispatch(serviceAPI.error(error));
        }
    }
);*/

Event Listeners/Sagas

const handleMessageAdded = createEventHandler({
    event: messageAddedToConversation,
    handler: async (messageId, conversationId, senderId, handlerAPI) => {
        // Logging details about the message added
        console.log(`Message ID: ${messageId}, Conversation ID: ${conversationId}, Sender ID: ${senderId}`);

        // Cancel any other running handlers if needed
        handlerAPI.cancelActiveHandlers();
    
        // Use condition to wait for some other action or state change
        if (await handlerAPI.condition(matchSomeOtherActionOrState)) {
            // Perform some action based on the condition being met
            await handlerAPI.dispatch(someOtherAction(additionalData));
        }

        // You could also spawn "child tasks" as described in the original listenerMiddleware
        const task = handlerAPI.fork(async (forkAPI) => {
            await forkAPI.delay(5);
            return 42;
        });

        const result = await task.result;
        if (result.status === 'ok') {
            console.log('Child succeeded: ', result.value);
        }
    },
});