素材牛VIP会员
java支持百万级消息推送组件
 祗***春  分类:Java代码  人气:2053  回帖:1  发布于5年前 收藏

1.准备工作:

下载comet4j-tomcat7.jar  这个现在只支持tomcat6和7两个版本  一定要对应上了,我这边测试的  在tomcat8下面是用comet4j-tomcat7.jar这个jar文件也是可以推送的

2.pom.xml配置

在pom.xml中加入下面代码即可


comet4j-tomcat7
test
1.0
system
${basedir}/src/main/webapp/WEB-INF/lib/comet4j-tomcat7.jar

3.修改tomcat配置文件server.xml

修改之前为:


修改之后为:


4.修改spring /web.xml配置


Comet4J容器侦听
org.comet4j.core.CometAppListener



监听我们自己的推送类
com.util.CometUtil


客户端访问入口
CometServlet
org.comet4j.core.CometServlet


CometServlet
/conn

5.java后端推送工具类

CometUtil.java

import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;

import org.comet4j.core.CometConnection;
import org.comet4j.core.CometContext;
import org.comet4j.core.CometEngine;
import org.comet4j.core.event.ConnectEvent;
import org.comet4j.core.listener.ConnectListener;

import com.Comet;

public class CometUtil extends ConnectListener implements ServletContextListener {
/**
* 初始化上下文
*/
public void contextInitialized(ServletContextEvent arg0) {
// CometContext : Comet4J上下文,负责初始化配置、引擎对象、连接器对象、消息缓存等。
CometContext cc = CometContext.getInstance();
// 注册频道,即标识哪些字段可用当成频道,用来作为向前台传送数据的“通道”
cc.registChannel(Constant.CHANNEL_MSGCOUNT);
cc.registChannel(Constant.CHANNEL_MSG_DATA);
//添加监听器
CometEngine engine = CometContext.getInstance().getEngine();
engine.addConnectListener(this);
}
@Override
public void contextDestroyed(ServletContextEvent sce) {
// TODO Auto-generated method stub
}
@Override
public boolean handleEvent(ConnectEvent connEvent){
// TODO Auto-generated method stub
final CometConnection conn = connEvent.getConn();
Object userId = conn.getRequest().getSession().getAttribute("currentUserId");
CacheManager.putContent(userId.toString(), connEvent);
return true;
}
private void doCache(final CometConnection conn,String userId) {
if (userId != null) {
CacheManager.putContent(conn.getId(), String.valueOf(userId), Constant.EXPIRE_AFTER_ONE_HOUR);
}
}
/**
* 推送给所有的客户端
* @param comet
*/
public void pushToAll(Comet comet){
try {
CometEngine engine = CometContext.getInstance().getEngine();
//推送到所有客户端
engine.sendToAll(Constant.CHANNEL_MSGCOUNT,comet.getMsgCount());
engine.sendToAll(Constant.CHANNEL_MSG_DATA,comet.getMsgData());
} catch (Exception e) {
// TODO: handle exception
System.out.println(e.getMessage());
}

}
/**
* 推送给指定客户端
* @param comet
*/
public void pushTo(Comet comet){
try {
ConnectEvent connEvent = (ConnectEvent) CacheManager.getContent(comet.getUserId()).getValue();
final CometConnection conn = connEvent.getConn();
//建立连接和用户的关系
doCache(conn,comet.getUserId());
final String connId = conn.getId();
CometEngine engine = CometContext.getInstance().getEngine();
if (CacheManager.getContent(connId).isExpired()) {
doCache(conn,comet.getUserId());
}
//推送到指定的客户端
engine.sendTo(Constant.CHANNEL_MSGCOUNT, engine.getConnection(connId), comet.getMsgCount());
engine.sendTo(Constant.CHANNEL_MSG_DATA, engine.getConnection(connId), comet.getMsgData());
} catch (Exception e) {
// TODO: handle exception
System.out.println(e.getMessage());
}
}

Constant.java


public class Constant {
public static long EXPIRE_AFTER_ONE_HOUR = 30; //cache过期时间
public static String CHANNEL_MSGCOUNT= "msgCount";
public static String CHANNEL_MSG_DATA= "msgData";
}

Comet.java

import java.util.List;
import java.util.Map;

public class Comet {
private String userId;
private String msgCount;
private ListmsgData;
public String getUserId() {
return userId;
}
public void setUserId(String userId) {
this.userId = userId;
}
public String getMsgCount() {
return msgCount;
}
public void setMsgCount(String msgCount) {
this.msgCount = msgCount;
}
public ListgetMsgData() {
return msgData;
}
public void setMsgData(ListmsgData) {
this.msgData = msgData;
}
}

Cache.java

public class Cache {
private String key;
private Object value;
private long timeOut;
private boolean expired;
public Cache() {
super();
}

public Cache(String key, String value, long timeOut, boolean expired) {
this.key = key;
this.value = value;
this.timeOut = timeOut;
this.expired = expired;
}

public String getKey() {
return key;
}

public long getTimeOut() {
return timeOut;
}

public Object getValue() {
return value;
}

public void setKey(String string) {
key = string;
}

public void setTimeOut(long l) {
timeOut = l;
}

public void setValue(Object object) {
value = object;
}

public boolean isExpired() {
return expired;
}

public void setExpired(boolean b) {
expired = b;
}
}

CacheManager.java


public class CacheManager {
    private static HashMap cacheMap = new HashMap();

    /**
     * This class is singleton so private constructor is used.
     */
    private CacheManager() {
            super();
    }

    /**
     * returns cache item from hashmap
     * @param key
     * @return Cache
     */
    private synchronized static Cache getCache(String key) {
            return (Cache)cacheMap.get(key);
    }

    /**
     * Looks at the hashmap if a cache item exists or not
     * @param key
     * @return Cache
     */
    private synchronized static boolean hasCache(String key) {
            return cacheMap.containsKey(key);
    }

    /**
     * Invalidates all cache
     */
    public synchronized static void invalidateAll() {
            cacheMap.clear();
    }

    /**
     * Invalidates a single cache item
     * @param key
     */
    public synchronized static void invalidate(String key) {
            cacheMap.remove(key);
    }

    /**
     * Adds new item to cache hashmap
     * @param key
     * @return Cache
     */
    private synchronized static void putCache(String key, Cache object) {
       cacheMap.put(key, object);
    }

    /**
     * Reads a cache item's content
     * @param key
     * @return
     */
    public static Cache getContent(String key) {
             if (hasCache(key)) {
                    Cache cache = getCache(key);
                    if (cacheExpired(cache)) {
                            cache.setExpired(true);
                    }
                    return cache;
             } else {
                     return null;
             }
    }

    /**
     *
     * @param key
     * @param content
     * @param ttl
     */
    public static void putContent(String key, Object content, long ttl) {
            Cache cache = new Cache();
            cache.setKey(key);
            cache.setValue(content);
            cache.setTimeOut(ttl + new Date().getTime());
            cache.setExpired(false);
            putCache(key, cache);
    }
    public static void putContent(String key, Object content) {
        Cache cache = new Cache();
        cache.setKey(key);
        cache.setValue(content);
        cache.setExpired(false);
        putCache(key, cache);
}

    /** @modelguid {172828D6-3AB2-46C4-96E2-E72B34264031} */
    private static boolean cacheExpired(Cache cache){
            if (cache == null) {
                    return false;
            }
            long milisNow = new Date().getTime();
            long milisExpire = cache.getTimeOut();
            if (milisExpire < 0) { // Cache never expires
                    return false;
            } else if (milisNow >= milisExpire) {
                    return true;
            } else {
                    return false;
            }
    }
}

6、前端jsp代码

在前段要显示推送的页面引入js

<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8" %>
<%@taglib prefix="c" uri="http://java.sun.com/jsp/jstl/core" %>
<%@taglib prefix="fn" uri="http://java.sun.com/jsp/jstl/functions" %>




消息数量:

消息数据:


经过以上的工作,我们就可以实现推送了 ,项目启动后,在任何类中调用下面的代码就可以推送给前端了,例如:

//所有客户端推送:

Comet comet = new Comet();
comet.setMsgCount(String.valueOf(msgCount));
comet.setMsgData(resultList);

new CometUtil()..pushToAll(comet);

//精准推送给某个客户端

Comet comet = new Comet();
comet.setUserId("1");//前端到session中的用户id

comet.setMsgCount(String.valueOf(msgCount));
comet.setMsgData(resultList);

new CometUtil()..pushTo(comet);


如果要实现实时推送,可以用spring定时任务  定时扫描需要推送的内容  然后在调用推送工具类

讨论这个帖子(1)垃圾回帖将一律封号处理……

Lv1 新人
qw***32 职业无 5年前#1

从来没处理过这个级别的数据

 文明上网,理性发言!   😉 阿里云幸运券,戳我领取