博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
如何在Java 环境下使用 HTTP 协议收发 MQ 消息
阅读量:6303 次
发布时间:2019-06-22

本文共 13242 字,大约阅读时间需要 44 分钟。

1. 准备环境

在工程 POM 文件添加 HTTP Java 客户端的依赖。

org.eclipse.jetty
jetty-client
9.3.4.RC1
com.aliyun.openservices
ons-client
1.1.11

2. 运行代码配置(user.properties)

您需要设置配置文件(user.properties)的相关内容,具体请参考申请 MQ 资源 。

#您在控制台创建的Topic    Topic=xxx    #公测url    URL=http://publictest-rest.ons.aliyun.com    #阿里云身份验证码    Ak=xxx    #阿里云身份验证密钥    Sk=xxx    #MQ控制台创建的Producer ID    ProducerID=xxx    #MQ控制台创建的Consumer ID    ConsumerID=xxx

说明:URL 中的 Key,Tag以及 POST Content-Type 没有任何的限制,只要确保Key 和 Tag 相同唯一即可,可以放在 user.properties 里面。

3. HTTP 发送消息示例代码
您可以按以下说明设置相应参数并测试 HTTP 消息发送功能。

package com.aliyun.openservice.ons.http.demo;    import java.nio.charset.Charset;    import java.util.Date;    import java.util.Properties;    import org.eclipse.jetty.client.HttpClient;    import org.eclipse.jetty.client.api.ContentProvider;    import org.eclipse.jetty.client.api.ContentResponse;    import org.eclipse.jetty.client.api.Request;    import org.eclipse.jetty.client.util.StringContentProvider;    import com.aliyun.openservices.ons.api.impl.authority.AuthUtil;    public class HttpProducer {        public static String SIGNATURE="Signature";        public static String NUM="num";        public static String CONSUMERID="ConsumerID";        public static String PRODUCERID="ProducerID";        public static String TIMEOUT="timeout";        public static String TOPIC="Topic";        public static String AK="AccessKey";        public static String BODY="body";          public static String MSGHANDLE="msgHandle";        public static String TIME="time";        public static void main(String[] args) throws Exception {            HttpClient httpClient=new HttpClient();               httpClient.setMaxConnectionsPerDestination(1);            httpClient.start();              Properties properties=new Properties();            properties.load(HttpProducer.class.getClassLoader().getResourceAsStream("user.properties"));            String topic=properties.getProperty("Topic"); //请在user.properties配置您的Topic            String url=properties.getProperty("URL");//公测集群配置为http://publictest-rest.ons.aliyun.com/            String ak=properties.getProperty("Ak");//请在user.properties配置您的Ak            String sk=properties.getProperty("Sk");//请在user.properties配置您的Sk            String pid=properties.getProperty("ProducerID");//请在user.properties配置您的Producer ID            String date=String.valueOf(new Date().getTime());              String sign=null;            String body="hello ons http";            String NEWLINE="\n";            String signString;            for (int i = 0; i < 10; i++) {                date=String.valueOf(new Date().getTime());                Request req=httpClient.POST(url+"message/?topic="+topic+"&time="+date+"&tag=http"+"&key=http");                ContentProvider content=new StringContentProvider(body);                req.content(content);                signString=topic+NEWLINE+pid+NEWLINE+MD5.getInstance().getMD5String(body)+NEWLINE+date;                System.out.println(signString);                sign=AuthUtil.calSignature(signString.getBytes(Charset.forName("UTF-8")), sk);                req.header(SIGNATURE, sign);                req.header(AK, ak);                req.header(PRODUCERID, pid);                ContentResponse response;                response=req.send();                System.out.println("send msg:"+response.getStatus()+response.getContentAsString());            }           }    }

4. HTTP接收消息示例代码

请按以下说明设置相应参数并测试 HTTP 消息接收功能。

package com.aliyun.openservice.ons.http.demo;    import java.nio.charset.Charset;    import java.util.Date;    import java.util.List;    import java.util.Properties;    import org.eclipse.jetty.client.HttpClient;    import org.eclipse.jetty.client.api.ContentProvider;    import org.eclipse.jetty.client.api.ContentResponse;    import org.eclipse.jetty.client.api.Request;    import org.eclipse.jetty.client.util.StringContentProvider;    import org.eclipse.jetty.http.HttpMethod;    import com.alibaba.fastjson.JSON;    import com.aliyun.openservice.ons.mqtt.demo.MqttProducer;    import com.aliyun.openservices.ons.api.impl.authority.AuthUtil;    public class HttpConsumer {        public static String SIGNATURE="Signature";        public static String NUM="num";        public static String CONSUMERID="ConsumerID";        public static String PRODUCERID="ProducerID";        public static String TIMEOUT="timeout";        public static String TOPIC="Topic";        public static String AK="AccessKey";        public static String BODY="body";          public static String MSGHANDLE="msgHandle";        public static String TIME="time";        public static void main(String[] args) throws Exception {            HttpClient httpClient=new HttpClient();               httpClient.setMaxConnectionsPerDestination(1);            httpClient.start();              Properties properties=new Properties();            properties.load(HttpConsumer.class.getClassLoader().getResourceAsStream("user.properties"));            String topic=properties.getProperty("Topic"); //请在user.properties配置您的topic            String url=properties.getProperty("URL");//公测集群配置为http://publictest-rest.ons.aliyun.com/            String ak=properties.getProperty("Ak");//请在user.properties配置您的Ak            String sk=properties.getProperty("Sk");//请在user.properties配置您的Sk            String cid=properties.getProperty("ConsumerID");//请在user.properties配置您的Consumer ID            String date=String.valueOf(new Date().getTime());              String sign=null;            String NEWLINE="\n";            String signString;            System.out.println(NEWLINE+NEWLINE);            while (true) {                  try {                    date=String.valueOf(new Date().getTime());                    Request req=httpClient.POST(url+"message/?topic="+topic+"&time="+date+"&num="+32);                    req.method(HttpMethod.GET);                    ContentResponse response;                    signString=topic+NEWLINE+cid+NEWLINE+date;                    sign=AuthUtil.calSignature(signString.getBytes(Charset.forName("UTF-8")), sk);                    req.header(SIGNATURE, sign);                    req.header(AK, ak);                    req.header(CONSUMERID, cid);                    long start=System.currentTimeMillis();                    response=req.send();                    System.out.println("get cost:"+(System.currentTimeMillis()-start)/1000                                         +"    "+response.getStatus()+"    "+response.getContentAsString());                      List
list = null; if (response.getContentAsString()!=null&&!response.getContentAsString().isEmpty()) { list=JSON.parseArray(response.getContentAsString(), SimpleMessage.class); } if (list==null||list.size()==0) { Thread.sleep(100); continue; } System.out.println("size is :"+list.size()); for (SimpleMessage simpleMessage : list) { date=String.valueOf(new Date().getTime()); System.out.println("receive msg:"+simpleMessage.getBody()+" born time "+simpleMessage.getBornTime()); req=httpClient.POST(url+"message/?msgHandle="+simpleMessage.getMsgHandle()+"&topic="+topic+"&time="+date); req.method(HttpMethod.DELETE); signString=topic+NEWLINE+cid+NEWLINE+simpleMessage.getMsgHandle()+NEWLINE+date; sign=AuthUtil.calSignature(signString.getBytes(Charset.forName("UTF-8")), sk); req.header(SIGNATURE, sign); req.header(AK, ak); req.header(CONSUMERID, cid); response=req.send(); System.out.println("delete msg:"+response.toString()); } Thread.sleep(100); } catch (Exception e) { e.printStackTrace(); } } } }

5. HTTP示例程序工具类

(1)消息封装类: SimpleMessage.java

package com.aliyun.openservice.ons.http.demo;    public class SimpleMessage {        private String body;        private String msgId;        private String bornTime;        private String msgHandle;        private int reconsumeTimes;        private String tag;        public void setTag(String tag) {            this.tag = tag;        }        public String getTag() {            return tag;        }        public int getReconsumeTimes() {            return reconsumeTimes;        }        public void setReconsumeTimes(int reconsumeTimes) {            this.reconsumeTimes = reconsumeTimes;        }        public void setMsgHandle(String msgHandle) {            this.msgHandle = msgHandle;        }        public String getMsgHandle() {            return msgHandle;        }        public String getBody() {            return body;        }        public void setBody(String body) {            this.body = body;        }        public String getMsgId() {            return msgId;        }        public void setMsgId(String msgId) {            this.msgId = msgId;        }        public String getBornTime() {            return bornTime;        }        public void setBornTime(String bornTime) {            this.bornTime = bornTime;        }    }

(2)字符串签名类: MD5.java

package com.aliyun.openservice.ons.http.demo;    import java.io.UnsupportedEncodingException;    import java.nio.charset.Charset;    import java.security.MessageDigest;    import java.sql.SQLException;    import java.util.Date;    import java.util.HashMap;    import java.util.Map;    import java.util.concurrent.ConcurrentHashMap;    import java.util.concurrent.locks.ReentrantLock;    import org.slf4j.LoggerFactory;    public class MD5 {        private static final org.slf4j.Logger log = LoggerFactory.getLogger(MD5.class);        private static char[] digits = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' };        private static Map
rDigits = new HashMap
(16); static { for (int i = 0; i < digits.length; ++i) { rDigits.put(digits[i], i); } } private static MD5 me = new MD5(); private MessageDigest mHasher; private final ReentrantLock opLock = new ReentrantLock(); private MD5() { try { this.mHasher = MessageDigest.getInstance("md5"); } catch (Exception e) { throw new RuntimeException(e); } } public static MD5 getInstance() { return me; } public String getMD5String(String content) { return this.bytes2string(this.hash(content)); } public String getMD5String(byte[] content) { return this.bytes2string(this.hash(content)); } public byte[] getMD5Bytes(byte[] content) { return this.hash(content); } public byte[] hash(String str) { this.opLock.lock(); try { byte[] bt = this.mHasher.digest(str.getBytes("utf-8")); if (null == bt || bt.length != 16) { throw new IllegalArgumentException("md5 need"); } return bt; } catch (UnsupportedEncodingException e) { throw new RuntimeException("unsupported utf-8 encoding", e); } finally { this.opLock.unlock(); } } public byte[] hash(byte[] data) { this.opLock.lock(); try { byte[] bt = this.mHasher.digest(data); if (null == bt || bt.length != 16) { throw new IllegalArgumentException("md5 need"); } return bt; } finally { this.opLock.unlock(); } } public String bytes2string(byte[] bt) { int l = bt.length; char[] out = new char[l << 1]; for (int i = 0, j = 0; i < l; i++) { out[j++] = digits[(0xF0 & bt[i]) >>> 4]; out[j++] = digits[0x0F & bt[i]]; } if (log.isDebugEnabled()) { log.debug("[hash]" + new String(out)); } return new String(out); } public byte[] string2bytes(String str) { if (null == str) { throw new NullPointerException("Argument is not allowed empty"); } if (str.length() != 32) { throw new IllegalArgumentException("String length must equals 32"); } byte[] data = new byte[16]; char[] chs = str.toCharArray(); for (int i = 0; i < 16; ++i) { int h = rDigits.get(chs[i * 2]).intValue(); int l = rDigits.get(chs[i * 2 + 1]).intValue(); data[i] = (byte) ((h & 0x0F) << 4 | l & 0x0F); } return data; } }

转载地址:http://ekfxa.baihongyu.com/

你可能感兴趣的文章
在Linux上高效开发的7个建议
查看>>
java数据结构 - 数组使用的代码
查看>>
个人简历-项目经验
查看>>
swoole异步任务task处理慢请求简单实例
查看>>
DHCP
查看>>
oracle数据泵导入分区表统计信息报错(四)
查看>>
spring技术内幕读书笔记之IoC容器的学习
查看>>
细说多线程(五) —— CLR线程池的I/O线程
查看>>
JavaScript instanceof和typeof的区别
查看>>
Hadoop文件系统详解-----(一)
查看>>
《面向模式的软件体系结构2-用于并发和网络化对象模式》读书笔记(8)--- 主动器...
查看>>
状态码
查看>>
我的友情链接
查看>>
用sqlplus远程连接oracle命令
查看>>
多年一直想完善的自由行政审批流程组件【2002年PHP,2008年.NET,2010年完善数据设计、代码实现】...
查看>>
自动生成四则运算题目
查看>>
【翻译】使用新的Sencha Cmd 4命令app watch
查看>>
【前台】【单页跳转】整个项目实现单页面跳转,抛弃iframe
查看>>
因为你是前端程序员!
查看>>
数据库设计中的14个技巧
查看>>