how we can process data in rabbitmq single message as well as collection of data

    By: Manu
    5 months ago

    In this example we are processing each type of message one by one. So you can see how data gets processed.


    <?php
    require 'connection.php';
    
    
    try {
        $channel = new AMQPChannel($connection);
        $exchange = new AMQPExchange($channel);
        $exchange->setName('Test1');
        $exchange->setFlags(AMQP_DURABLE);
        $exchange->setType(AMQP_EX_TYPE_DIRECT); // Set the exchange type to 'direct'
    
    
        $data = [
            "data for routing key a " . time(),
            "data for routing key a " . time(),
            "data for routing key a " . time(),
            "data for routing key a " . time(),
        ];
    
    
        $data2 = [
            "data for routing key b " . time(),
            "data for routing key b " . time(),
            "data for routing key b " . time(),
            "data for routing key b " . time(),
        ];
    
    
        if (!empty($data)) {
            foreach ($data as $message) {
                $exchange->publish($message, 'routing_key_A');
            }
        }
    
    
        if (!empty($data2)) {
            foreach ($data2 as $message2) {
                $exchange->publish($message2, 'routing_key_B');
            }
        }
    } catch (AMQPConnectionException | AMQPExchangeException | AMQPChannelException $e) {
        var_dump($e->getMessage());
    }
    


    Now in case you want to process whole array instead of each message at a time. you can change your code as given below.


    <?php
    require 'connection.php';
    
    
    try {
        $channel = new AMQPChannel($connection);
        $exchange = new AMQPExchange($channel);
        $exchange->setName('Test1');
        $exchange->setFlags(AMQP_DURABLE);
        $exchange->setType(AMQP_EX_TYPE_DIRECT); // Set the exchange type to 'direct'
    
    
        $data = [
            "data for routing key a " . time(),
            "data for routing key a " . time(),
            "data for routing key a " . time(),
            "data for routing key a " . time(),
        ];
    
    
        $data2 = [
            "data for routing key b " . time(),
            "data for routing key b " . time(),
            "data for routing key b " . time(),
            "data for routing key b " . time(),
        ];
    
    
        if (!empty($data)) {
            $exchange->batchPublish($data, 'routing_key_A');
        }
    
    
        if (!empty($data2)) {
            $exchange->batchPublish($data2, 'routing_key_B');
        }
    
    
        // Publish all messages in the batch
        $exchange->publishBatch();
    } catch (AMQPConnectionException | AMQPExchangeException | AMQPChannelException $e) {
        var_dump($e->getMessage());
    }
    
    


    Hope this guide helps