使用核心api消费后,HornetQ消息仍然保留在队列中

| 我是HornetQ的新手,请多多包涵。首先让我告诉您我的要求: 我需要一个消息排队中间件,该中间件可以在低延迟和持久性的不同进程之间传递大小约为1k的消息(即它应该在系统崩溃时幸免)。我将有多个进程写入同一队列,并且类似地有多个进程从同一队列读取。 为此,我选择了HornetQ,因为它具有持久性传递消息的最佳评分。 我目前使用Hornetq v2.2.2Final作为独立服务器。 我能够使用核心api(ClientSession)成功创建持久/非持久队列,并成功将消息发布到队列(ClientProducer)。 同样,我能够使用核心api(ClientConsumer)从队列中读取消息。 问题出在此之后,当客户端读取消息时,消息仍保留在队列中,即,队列中的消息数保持不变。也许我弄错了,但我的印象是,一旦消息被消耗(读取+确认),就将其从队列中删除。再次。 另外,我想告诉我,我已经尝试将非持久性队列与非持久性消息一起使用。但是问题仍然存在。 我正在使用的生产者代码:
public class HQProducer implements Runnable {

    private ClientProducer producer;
    private boolean killme;
    private ClientSession session;
    private boolean durableMsg;

    public HQProducer(String host, int port, String address, String queueName,
            boolean deleteQ, boolean durable, boolean durableMsg, int pRate) {
        this.durableMsg = durableMsg;
        try {
            HashMap map = new HashMap();
            map.put(\"host\", host);
            map.put(\"port\", port);

            TransportConfiguration config = new TransportConfiguration(NettyConnectorFactory.class.getName(), map);

            ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(config);

            ClientSessionFactory factory = locator.createSessionFactory();

            session = factory.createSession();

            if (queueExists(queueName)) {
                if (deleteQ) {
                    System.out.println(\"Deleting existing queue :: \" + queueName);
                    session.deleteQueue(queueName);
                    System.out.println(\"Creating queue :: \" + queueName);
                    session.createQueue(address, queueName, true);
                }
            } else {
                System.out.println(\"Creating new  queue :: \" + queueName);
                session.createQueue(address, queueName, durable);
            }
            producer = session.createProducer(SimpleString.toSimpleString(address), pRate);

            killme = false;
        } catch (Exception ex) {
            Logger.getLogger(HQTestProducer.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

    @Override
    public void run() {
        long time = System.currentTimeMillis();
        int cnt = 0;
        long timediff;
        while (!killme) {
            try {
                ClientMessage message = session.createMessage(durableMsg);

                message.getBodyBuffer().writeString(\"Hello world\");

                producer.send(message);
                cnt++;
                timediff = ((System.currentTimeMillis() - time) / 1000);
                if (timediff >= 1) {
                    System.out.println(\"Producer tps :: \" + cnt);
                    cnt = 0;
                    time = System.currentTimeMillis();
                }
            } catch (HornetQException ex) {
                Logger.getLogger(HQProducer.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
        try {
            session.close();
        } catch (HornetQException ex) {
            Logger.getLogger(HQProducer.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

    public void setKillMe(boolean killme) {
        this.killme = killme;
    }

    private boolean queueExists(String qname) {
        boolean res = false;
        try {
            //ClientSession.BindingQuery bq = session.bindingQuery(SimpleString.toSimpleString(qname));
            QueueQuery queueQuery = session.queueQuery(SimpleString.toSimpleString(qname));
            if (queueQuery.isExists()) {
                res = true;
            }
        } catch (HornetQException ex) {
            res = false;
        }
        return res;
    }
}
消费者的代码也是:
public class HQConsumer implements Runnable {

    private ClientSession session;
    private ClientConsumer consumer;
    private boolean killMe;

    public HQConsumer(String host, int port, String queueName, boolean browseOnly) {
        try {
            HashMap map = new HashMap();
            map.put(\"host\", host);
            map.put(\"port\", port);

            TransportConfiguration config = new TransportConfiguration(NettyConnectorFactory.class.getName(), map);

            ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(config);

            ClientSessionFactory factory = locator.createSessionFactory();

            session = factory.createSession();

            session.start();

            consumer = session.createConsumer(queueName, \"\",0,-1,browseOnly);

            killMe = false;
        } catch (Exception ex) {
            Logger.getLogger(HQTestProducer.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

    @Override
    public void run() {
        long time = System.currentTimeMillis();
        int cnt = 0;
        long timediff;
        while (!killMe) {
            try {
                ClientMessage msgReceived = consumer.receive();
                msgReceived.acknowledge();
                //System.out.println(\"message = \" + msgReceived.getBodyBuffer().readString());
                cnt++;
                timediff = ((System.currentTimeMillis() - time) / 1000);
                if (timediff >= 1) {
                    System.out.println(\"ConSumer tps :: \" + cnt);
                    cnt = 0;
                    time = System.currentTimeMillis();
                }
            } catch (HornetQException ex) {
                Logger.getLogger(HQConsumer.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
        try {
            session.close();
        } catch (HornetQException ex) {
            Logger.getLogger(HQConsumer.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

    public void setKillMe(boolean killMe) {
        this.killMe = killMe;
    }
}
HornetQ服务器配置::
<configuration xmlns=\"urn:hornetq\"
               xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"
               xsi:schemaLocation=\"urn:hornetq /schema/hornetq-configuration.xsd\">

   <paging-directory>${data.dir:../data}/paging</paging-directory>

   <bindings-directory>${data.dir:../data}/bindings</bindings-directory>

   <journal-directory>${data.dir:../data}/journal</journal-directory>

   <journal-min-files>10</journal-min-files>

   <large-messages-directory>${data.dir:../data}/large-messages</large-messages-directory>

   <connectors>
      <connector name=\"netty\">
         <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
         <param key=\"host\"  value=\"${hornetq.remoting.netty.host:localhost}\"/>
         <param key=\"port\"  value=\"${hornetq.remoting.netty.port:5445}\"/>
      </connector>

      <connector name=\"netty-throughput\">
         <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
         <param key=\"host\"  value=\"${hornetq.remoting.netty.host:localhost}\"/>
         <param key=\"port\"  value=\"${hornetq.remoting.netty.batch.port:5455}\"/>
         <param key=\"batch-delay\" value=\"50\"/>
      </connector>
   </connectors>

   <acceptors>
      <acceptor name=\"netty\">
         <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
         <param key=\"host\"  value=\"${hornetq.remoting.netty.host:localhost}\"/>
         <param key=\"port\"  value=\"${hornetq.remoting.netty.port:5445}\"/>
      </acceptor>

      <acceptor name=\"netty-throughput\">
         <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
         <param key=\"host\"  value=\"${hornetq.remoting.netty.host:localhost}\"/>
         <param key=\"port\"  value=\"${hornetq.remoting.netty.batch.port:5455}\"/>
         <param key=\"batch-delay\" value=\"50\"/>
         <param key=\"direct-deliver\" value=\"false\"/>
      </acceptor>
   </acceptors>

   <security-settings>
      <security-setting match=\"#\">
         <permission type=\"createNonDurableQueue\" roles=\"guest\"/>
         <permission type=\"deleteNonDurableQueue\" roles=\"guest\"/>
         <permission type=\"createDurableQueue\" roles=\"guest\"/>
         <permission type=\"deleteDurableQueue\" roles=\"guest\"/>
         <permission type=\"consume\" roles=\"guest\"/>
         <permission type=\"send\" roles=\"guest\"/>
      </security-setting>
   </security-settings>

   <address-settings>
      <!--default for catch all-->
      <address-setting match=\"#\">
         <dead-letter-address>jms.queue.DLQ</dead-letter-address>
         <expiry-address>jms.queue.ExpiryQueue</expiry-address>
         <redelivery-delay>0</redelivery-delay>
         <max-size-bytes>10485760</max-size-bytes>       
         <message-counter-history-day-limit>10</message-counter-history-day-limit>
         <address-full-policy>BLOCK</address-full-policy>
      </address-setting>
   </address-settings>

</configuration>
    
已邀请:
        使用hornetq核心api,您必须明确确认一条消息。我看不到测试中发生了什么。 如果您没有确认,这就是您的邮件被阻止的原因。我需要查看您完整的示例才能为您提供完整的答案。 另外:您应该使用以下方法定义createSession:createSession(true,true,0) 核心API具有批处理ACK的选项。您没有使用事务处理会话,因此只有达到在serverLocator上配置的ackBatchSize时,才将acks发送到服务器。有了此设置,一旦您在消息中调用accept(),任何确认将被发送到服务器。 您当前使用的选项等效于具有特定DUPS_SIZE的JMS DUPS_OK。 (与您反复讨论后,Post编辑了我的初始答案)     
        设置
ackbatchsize
有助于解决此问题。 谢谢您的帮助     

要回复问题请先登录注册