Posts tagged 'prooph'

Prooph query bus

published on December 20, 2017.

Continuing on with the Prooph series, I want to take a look at the query bus of the Prooph service bus component. The query bus provides a way to issue a query (not necessarily a database query!) to a query handler. This handler is then responsible to return a result for our query.

Why would we need a query bus in the first place? While some may argue that the query bus is not really required, it can be a nice addition to complete the CQRS idea. Just as we have a single endpoint to handle all of our commands and events, we have a single endpoint that can handle all the queries.

The query bus allows the handler to do whatever it needs to do to return the result, synchronously or asynchronously. This is achieved by having the query bus return a ReactPHP Promise. The query handler itself will be a deferred unit of work, which allows it to promise to the querier that the query will be resolved or rejected sometime in the future.

Every query message we send with the query bus, must be routed to exactly one query handler on the other side. Of course, multiple query messages can be routed to the same handler.

Prooph’s service bus also supports a plugin system which we can use, for example, to have authorization of commands, events, and queries, logging… But more on that in a future blog post.

A quick example

The query message, just as a command or an event, can be pretty much anything — a primitive like a string or an integer, a custom data transfer object, or a class implementing the Prooph\Common\Messaging\Message interface by extending the Query class from the prooph-common library.

Setting up the query bus for using it is similar to setting up the command bus or the event bus:

  • we create the query bus,
  • we create a query router that the query bus uses to route query messages to query handlers,
  • we route a query message to its query handler,
  • we attach the router to the query bus,
  • and finally, we dispatch the query on the query bus.

Let’s see how it looks like in code:

query-bus.php

<?php declare(strict_types=1);

require_once 'vendor/autoload.php';

use Prooph\ServiceBus\Plugin\Router\QueryRouter;
use Prooph\ServiceBus\QueryBus;

$queryBus = new QueryBus();

$queryRouter = new QueryRouter();

$queryRouter->route('A simple string')
            ->to(new ProophExample\QueryHandler\Primitives());

$queryRouter->attachToMessageBus($queryBus);

$queryBus->dispatch('A simple string')
         ->done(function($result) {
            echo $result . PHP_EOL;
         }, function ($reason) {
            echo $reason . PHP_EOL;
         });

Not much going on but it shows how to set up and use the query bus.

The query handler part of this example looks like this:

src/ProophExample/QueryHandler/Primitives.php

<?php declare(strict_types=1);

namespace ProophExample\QueryHandler;

use React\Promise\Deferred;

class Primitives
{
    public function __invoke(string $query, Deferred $deferred)
    {
        $i = rand(1, 10);

        if ($i % 2 == 0) {
            $deferred->resolve(str_rot13($query));
        } else {
            $deferred->reject("Out of luck");
        }
    }
}

The query handler is an invokable that gets invoked with the string query and a React\Promise\Deferred unit of work, which we use to either resolve or reject the query.

While this example with the primitives gives an overall picture of how to use the query bus, it’s not really useful.

How many open CFPs are on JoindIn?

JoindIn has an open API which we can use to query it about events, like conferences and meetups. I think we can use it to show a better example of the query bus.

We’re going to have a query message that we’ll use to pass the type of the event we’re interested in — all, hot, upcoming, past, cfp — and a query handler that will assemble the URL for the API call and call it with a simple file_get_contents.

The query message for this example looks something like the following:

src/ProophExample/Query/JoindInEvents.php

<?php declare(strict_types=1);
namespace ProophExample\Query;

use Assert\Assertion;

class JoindInEvents
{
    private $type;
    public function __construct(string $type)
    {
        Assertion::choice($type, ['all', 'hot', 'upcoming', 'past', 'cfp']);
        $this->type = $type;
    }

    public function type(): string
    {
        return $this->type;
    }
}

We pass it in a string $type, assert that it is one of the expected values and set it as a class property. Really not much else to it than that.

The query handler will handle that query, issue the API call and resolve the React promise if it manages to decode the JSON response, or reject it if it fails:

src/ProophExample/QueryHandler/JoindInEvents.php

<?php declare(strict_types=1);

namespace ProophExample\QueryHandler;

use ProophExample\Query\JoindInEvents as Query;
use React\Promise\Deferred;

class JoindInEvents
{
    public function __invoke(Query $query, Deferred $deferred)
    {
        $url = 'https://api.joind.in/v2.1/events';

        $eventType = $query->type();

        if ($eventType != 'all') {
            $url .= '?filter=' . $eventType;
        }

        $response = file_get_contents($url);

        $jsonResponse = json_decode($response);

        if ($jsonResponse === null) {
            $deferred->reject("Error decoding json: " . json_last_error_msg());
        }

        $deferred->resolve($jsonResponse);
    }
}

In a real production code we’d probably use a proper HTTP client instead of file_get_contents, do more error checking and stuff, but in only a few lines of code we can see how to create a query handler.

To put it all together and call it, we’d have something like the following example:

query-bus.php

<?php declare(strict_types=1);

require_once 'vendor/autoload.php';

use Prooph\ServiceBus\Plugin\Router\QueryRouter;
use Prooph\ServiceBus\QueryBus;

$queryBus = new QueryBus();

$queryRouter = new QueryRouter();

$queryRouter->route(ProophExample\Query\JoindInEvents::class)
            ->to(new ProophExample\QueryHandler\JoindInEvents());

$queryRouter->attachToMessageBus($queryBus);

$queryBus->dispatch(new ProophExample\Query\JoindInEvents('cfp'))
         ->done(function($result) {
            echo sprintf("There are %d CFPs!", $result->meta->count) . PHP_EOL;
         }, function($reason){
            echo $reason . PHP_EOL;
         });

If the query message was resolved by the query handler we print out how many CFPs are there right now, and if the query handler rejected the query message, we print out the reason of rejection.

As with the command and the event bus, the examples seen here are available in my prooph-examples repository.

Happy hackin’!

P.S.: Thanks to Alexander Miertsch for helping me understand the query bus a little more!

Prooph event bus

published on November 15, 2017.

Let’s continue with the Prooph components, with another part of the Service Bus: the event bus.

As mentioned in the previous article on the command bus, the Prooph Service Bus has three kinds of buses:

  • the command bus,
  • the event bus,
  • and, the query bus.

The event bus takes one event, and that event is sent to all the event listeners that are listening for that event. If there are no listeners, the event bus will still dispatch the event, but it won’t break the application. Any listeners listening to that event, will receive the event, and then we can do something based on that event — update a database table, send an email, push a notification, etc.

Compare that to the command bus, where we send one command on the command bus, and that command must be handled by a registered handler, otherwise the command bus will throw an exception.

Events are messages of things that happened in our system, and we name them accordingly: UserRegistered, CommentAdded, RssFeedUpdated.

From the nature of the events, and from the naming convention, we can conclude that we can not prevent events, as they have already happened. When we create an account for a user, we send an event of that, but we can’t do anything to the event to prevent the creation of the new user account. We can only react to that event.

When working in a DDD fashion, sending events is a great way to notify other bounded contexts of events that happened in our models.

You’re not required to do CQRS, Event Sourcing, and DDD, to be able to use eventing in your applications. It is possible to use the event bus as a standalone component, and if all what you need is to send an event, and then react to that event, do just that. At the very least, you get to decouple the sending of welcome emails, from the actual user registration.

A quick example

The message that represents our event can be anything — a primitive, a custom DTO class, or a class implementing the Message interface from the prooph-common library.

Setting up and using the event bus is similar to setting up and using the command bus:

  • we create the event bus,
  • we create an event router that the event bus uses to route events to event listeners,
  • we route a event to its event listeners,
  • we attach the router to the event bus,
  • and finally, we dispatch the event on the event bus.

Or in code:

event-bus.php

<?php declare(strict_types=1);

require_once 'vendor/autoload.php';

use Prooph\ServiceBus\EventBus;
use Prooph\ServiceBus\Plugin\Router\EventRouter;

$eventBus = new EventBus();

$eventRouter = new EventRouter();

$eventRouter->route('A simple string')
            ->to(new ProophExample\EventListener\Primitives());

$eventRouter->attachToMessageBus($eventBus);

$event = 'A simple string';
$eventBus->dispatch($event);

The Primitives event listener in this case doesn’t do much, and isn’t even named as we would name a real event listener, but it shows how would we create an event listener:

src/ProophExample/EventListener/Primitives.php

<?php declare(strict_types=1);

namespace ProophExample\EventListener;

class Primitives
{
    public function __invoke(string $event)
    {
        echo $event . PHP_EOL;
    }
}

It just outputs the event that it got from the event bus.

Sending a welcome email

A bit more realistic example would be to send a welcome email to a user when they register with us, and to increase the total number of user accounts. Remember, we can have multiple event listeners react to the same event!

We have the UserRegistered event, that will hold the User model that was created during the registration process:

src/ProophExample/Event/UserRegistered.php

<?php declare(strict_types=1);

namespace ProophExample\Event;

use ProophExample\User;

class UserRegistered
{
    /**
    * @var User
    */
    protected $user;

    public function __construct(User $user)
    {
        $this->user = $user;
    }

    public function user(): User
    {
        return $this->user;
    }
}

We also have the event listener responsible for sending out welcome emails:

src/ProophExample/EventListener/SendWelcomeEmail.php

<?php declare(strict_types=1);

namespace ProophExample\EventListener;

use ProophExample\Event\UserRegistered;

class SendWelcomeEmail
{
    public function __invoke(UserRegistered $event)
    {
        echo sprintf("Hello %s", $event->user()->name()) . PHP_EOL;
    }
}

And we have the event listener responsible for increasing the number of user accounts:

src/ProophExample/EventListener/IncreaseNumberOfAccounts.php

<?php declare(strict_types=1);

namespace ProophExample\EventListener;

use ProophExample\Event\UserRegistered;

class IncreaseNumberOfAccounts
{
    public function __invoke(UserRegistered $event)
    {
        echo "Increasing the number of user accounts" . PHP_EOL;
    }
}

When tying all this together, we’d have something like the following example:

event-bus.php

<?php declare(strict_types=1);

require_once 'vendor/autoload.php';

use Prooph\ServiceBus\EventBus;
use Prooph\ServiceBus\Plugin\Router\EventRouter;

$eventBus = new EventBus();

$eventRouter = new EventRouter();

$eventRouter->route(ProophExample\Event\UserRegistered::class)
            ->to(new ProophExample\EventListener\SendWelcomeEmail())
            ->andTo(new ProophExample\EventListener\IncreaseNumberOfAccounts());

$eventRouter->attachToMessageBus($eventBus);

$user = ProophExample\User::register('john.doe@example.com', 'John Doe');

$event = new ProophExample\Event\UserRegistered($user);

$eventBus->dispatch($event);

We create the event bus, the event router, we route the UserRegistered event to the SendWelcomeEmail event listener, and to the IncreaseNumberOfAccounts event listener, attaching the router to the event bus. Next we register our new user, and we create and dispatch our UserRegistered event.

Running this example gives us:

$ php event-bus.php
Hello John Doe
Increasing the number of user accounts

As we can see, first the event listener responsible for sending the welcome email gets invoked, and then the event listener for increasing the number of user accounts. They were invoked in the order we attached them to the router.

Prooph Messages

In the previous article about the command bus, we saw that the messages, that is the commands, can implement the Prooph\Common\Messaging\Message interface. In that section I said that I don’t really see the benefit of having commands implement that interface, but I do think that the events benefit a great deal from that interface.

Why?

By implementing that interface, we get a UUID for that event, a date and a time when it happened, and other information. All of this is of great value because an event listener might handle an event sometimes in the future, whereas we expect a command to be handled immediately. This extra information about events can be especially useful if/when we want to have Event Sourcing in our application.

An example event that signals that a RSS feed has been updated would look something like this implementing the Message interface:

src/ProophExample/Event/FeedUpdated.php

<?php declare(strict_types=1);

namespace ProophExample\Event;

use Prooph\Common\Messaging\DomainEvent;
use Prooph\Common\Messaging\PayloadConstructable;
use Prooph\Common\Messaging\PayloadTrait;
use ProophExample\Url;

class FeedUpdated extends DomainEvent implements PayloadConstructable
{
    use PayloadTrait;

    public function url(): Url
    {
        return $this->payload['url'];
    }
}

The prooph-common library not only provides the interface, but also abstract classes that help us with implementing the methods defined in the interfaces.

Creating and dispatching this event will then be:

event-bus.php

<?php

$url = ProophExample\Url::fromString('https://robertbasic.com/index.xml');
$event = new ProophExample\Event\FeedUpdated(['url' => $url]);

$eventBus->dispatch($event);

And the listener then can access the Url, as well as the extra event information, like the date and time when the event was created:

src/ProophExample/EventListener/NotifyAboutNewArticles.php

<?php declare(strict_types=1);

namespace ProophExample\EventListener;

use ProophExample\Event\FeedUpdated;

class NotifyAboutNewArticles
{
    public function __invoke(FeedUpdated $event)
    {
        echo sprintf("There are new articles to read from %s since %s",
            $event->url(),
            $event->createdAt()->format('Y-m-d H:i:s')
        ) . PHP_EOL;
    }
}

A more real-world like example

Same as with the command bus, we wouldn’t really use the event bus as we see it in this event-bus.php example file.

We would maybe have a factory of some kind that would create the event bus, configure the event router, and attach it to the event bus. Then we would get the event bus from a psr/container compatible container where we need it, create the event, and then dispatch it on the event bus. I’ve already given an example of this in the previous article, so I don’t want to repeat myself here.

The examples shown and discussed here are available in my prooph-examples repository.

Happy hackin’!

Prooph command bus

published on November 07, 2017.

Prooph is a CQRS and Event Sourcing component for PHP, and as they state on their website:

Prooph components include everything to get you started with CQRS and Event Sourcing.

CQRS and Event Sourcing go hand in hand with Domain Driven Design, but can be used outside of DDD too. They are patterns and methodologies that are here to help us make complicated and complex software designs more manageable, and all around better. Or make them even more complicated and complex.

In any case, I believe DDD is the way to go forward, as it puts communication with business stakeholders front and center, and at the end of the day, communication is the key to the success of any software project.

A tiny drop of theory

CQRS stands for Command Query Responsibility Segregation.

It boils down to the idea that instead of having one model that does both writing to and reading from the storage layer, you instead split them in two separate models. Then one of those models is responsible only for writing, and the other model is responsible for reading. The write “side” handles the command part, and the read “side” handles the query part of these responsibilities.

If you’re interested in more theory around this, and you should be, read this article on CQRS by Martin Fowler and this Clarified CQRS article by Udi Dahan. The CQRS journey on MSDN and the CQRS pattern documentation helped me a great deal to get a better understanding of this topic.

As for Event Sourcing… We’ll get to that in another blog post, when we’ll talk about the Event Sourcing part of Prooph.

The command bus

Now, let’s get started with Prooph. The first component we’re going to look at is the Service Bus.

The service bus offers a messaging system between the application and the domain layer. It allows us to send, or dispatch, a message on this service bus, and then to have handlers on the other side of the service bus that we’ll use to, well, handle these messages.

Prooph’s service bus has three different kinds of buses:

  • the command bus — it dispatches one message, a command, to exactly one handler,
  • the event bus — it dispatches one message, an event, to zero or more event handlers,
  • and, the query bus — it dispatches one message, a query, to exactly one handler, but returns a React\Promise\Promise.

Today we’re going to look at — you’ve guessed it! — the command bus.

The command bus gives us the ability to send a command through the command bus itself, and dispatches that command to a command handler we specified. We send in a message, and on the otherside that same message comes out to the command handler.

It is worth mentioning that the command bus can be used as a standalone component, if you’re interested only in that part. You’re not required to do CQRS, Event Sourcing, and/or DDD, to be able to use the command bus. If all you want, or all you need, to do is send a command, and have that command handled on the other side, by all means, do just that.

The command bus can dispatch anything as a command: a primitive like a string or an integer, a Data Transfer Object (DTO) that represents our command, or a Prooph Message (an interface found in the prooph-common library).

We name these commands based on the action that we want to do: RegisterUser, FetchUrl, SendEmail.

To dispatch a command on the command bus, we do the following:

  • we create the command bus,
  • we create a command router that the command bus uses to route commands to command handlers,
  • we route a command to its command handler,
  • we attach the router to the command bus,
  • and finally, we dispatch the command on the command bus.

This sounds like an awful lot; a picture code example is worth a thousand words:

command-bus.php

<?php declare(strict_types=1);

require_once 'vendor/autoload.php';

use Prooph\ServiceBus\CommandBus;
use Prooph\ServiceBus\Plugin\Router\CommandRouter;

$commandBus = new CommandBus();

$commandRouter = new CommandRouter();

$commandRouter->route('A simple string')
    ->to(new \ProophExample\CommandHandler\Primitives());

$commandRouter->attachToMessageBus($commandBus);

$commandBus->dispatch('A simple string');

The Primitives command handler is an invokable that, for this example, only prints out the “primitive” command we dispatched to it for handling:

src/ProophExample/CommandHandler/Primitives.php

<?php declare(strict_types=1);

namespace ProophExample\CommandHandler;

class Primitives
{
    public function __invoke(string $primitiveCommand)
    {
        echo $primitiveCommand . PHP_EOL;
    }
}

In a real application it would do something a bit more meaningful.

If we run this command-bus.php example, we’d see this:

$ php command-bus.php
A simple string

If we’d tell the command bus to dispatch something else instead of 'A simple string':

<?php
$commandBus->dispatch('Some other string');

and we run the example script again, we’d get the following exception:

Prooph\ServiceBus\Exception\RuntimeException: CommandBus was not able to identify a CommandHandler for command Some other string

That’s because we told the $commandRouter to route the command 'A simple string', yet we dispatched 'Some other string'. Remember, every dispatched command must be handled by exactly one command handler, and in this case the command bus doesn’t know how to handle our command.

Going past primitives

Except for showing examples, I don’t think primitives as commands are really useful.

How I personally use the command bus, is by creating classes of commands, which are nothing else but DTOs:

src/ProophExample/Command/FetchUrl.php

<?php declare(strict_types=1);

namespace ProophExample\Command;

use ProophExample\Url;

class FetchUrl
{
    /**
    * @var Url
    */
    protected $url;

    public function __construct(string $url)
    {
        $this->url = Url::fromString($url);
    }

    public function url(): Url
    {
        return $this->url;
    }
}

A command is a good place to convert our primitives to value objects!

The accompanying command handler is:

src/ProophExample/CommandHandler/FetchUrl.php

<?php declare(strict_types=1);

namespace ProophExample\CommandHandler;

use ProophExample\Command;

class FetchUrl
{
    public function __invoke(Command\FetchUrl $command)
    {
        echo sprintf("Fetching url: %s", $command->url()) . PHP_EOL;
    }
}

Again, it doesn’t do much besides printing out the url that our command DTO transferred for us across the command bus.

The command bus follows the same principle: tell the command router what command to route to what command handler, create the command, and dispatch it on the command bus:

command-bus.php

<?php declare(strict_types=1);

require_once 'vendor/autoload.php';

use Prooph\ServiceBus\CommandBus;
use Prooph\ServiceBus\Plugin\Router\CommandRouter;

$commandBus = new CommandBus();

$commandRouter = new CommandRouter();

$commandRouter->route(ProophExample\Command\FetchUrl::class)
    ->to(new ProophExample\CommandHandler\FetchUrl());

$commandRouter->attachToMessageBus($commandBus);

$url = 'https://robertbasic.com/index.xml';
$command = new ProophExample\Command\FetchUrl($url);

$commandBus->dispatch($command);

Prooph Messages

As mentioned earlier, the commands can also be Prooph Messages. These are commands that implement the Prooph\Common\Messaging\Message interface.

Note that the prooph-common library not only provides us the interface(s) we should implement, but also some abstract classes and traits to do the “plumbing” for us.

Let’s see how what would this be like:

src/ProophExample/Command/RegisterUser.php

<?php declare(strict_types=1);

namespace ProophExample\Command;

use Prooph\Common\Messaging\Command;
use Prooph\Common\Messaging\PayloadConstructable;
use Prooph\Common\Messaging\PayloadTrait;
use ProophExample\Email;

class RegisterUser extends Command implements PayloadConstructable
{
    use PayloadTrait;

    public function email(): Email
    {
        return Email::fromString($this->payload['email']);
    }
}

The two interfaces, Message and HasMessageName, together with the Command abstract class, and the DomainMessage abstract class it extends, provide a type for our message (command in this case), a UUID, a date and time when the command was created, the payload of the command, and some meta data.

The PayloadConstructable interface and the PayloadTrait trait give us an implementation of a constructor that expects exactly one argument, an array, that holds the payload for our command.

To create this command, we do the following:

<?php
$payload = ['email' => 'john.doe@example.com'];
$command = new ProophExample\Command\RegisterUser($payload);

In the case of commands, I personally prefer a custom DTO, over a Message type.

A more real-world like example

The command-bus.php example from before doesn’t really show how would we use the command bus in a more real-life setting. When we want to dispatch a command somewhere in our application, we don’t want to deal with all the routing and stuff, we just want to send a command to the command bus to be handled by a command handler.

If we’re using Symfony, one option would be to create a custom factory for the command bus, where we create the command bus, the router for it, and route the commands to command handlers:

src/ProophExample/CommandBusFactory.php

<?php declare(strict_types=1);

namespace ProophExample;

use Prooph\ServiceBus\CommandBus;
use Prooph\ServiceBus\Plugin\Router\CommandRouter;
use Symfony\Component\DependencyInjection\ContainerInterface;

class CommandBusFactory
{
    public static function createCommandBus(ContainerInterface $container): CommandBus
    {
        $commandBus = new CommandBus();

        $router = new CommandRouter();

        $router->route(ProophExample\Command\FetchUrl::class)
            ->to($container->get(ProophExample\CommandHandler\FetchUrl::class));

        $router->attachToMessageBus($commandBus);

        return $commandBus;
    }
}

The relevant part in the service definition file would be:

app/config/services.xml

<service id="Prooph\ServiceBus\CommandBus" class="Prooph\ServiceBus\CommandBus">
    <factory service="ProophExample\CommandBusFactory" method="createCommandBus" />
    <argument type="service" id="service_container" />
</service>

Then somewhere in our application, for example in a controller, we can get the CommandBus from the container, and dispatch the command:

src/AppBundle/Controller/ExampleController.php

<?php
// namespace imports left out intentionally
class ExampleController extends Controller
{
    public function indexAction(Request $request)
    {
        $url = 'https://robertbasic.com/index.xml';
        $command = new ProophExample\Command\FetchUrl($url);

        $this->get(Prooph\ServiceBus\CommandBus::class)->dispatch($command);
    }
}

The Prooph ServiceBus also comes equipped with a psr/container compatible Prooph\ServiceBus\Container\CommandBusFactory factory. The proophesor-do application has an example how to configure and use it.

There’s also a Symfony bundle that provides integration of the ServiceBus with Symfony.

Some of the examples shown and discussed here are available in my prooph-examples repository.

Happy hackin’!

Robert Basic

Robert Basic

Software engineer, consultant, open source contributor.

Let's work together!

If you require outsourcing or consulting help on your projects, I'm available!

Robert Basic © 2008 — 2018
Get the feed