基于WebSocketClient,用于监听用户、组织、角色变更的业务场景的实时数据更新的方法

/ 0条评论 / 0 个点赞 / 1171人阅读

下载jar

<dependency>
            <groupId>org.java-websocket</groupId>
            <artifactId>Java-WebSocket</artifactId>
            <version>1.3.5</version>
        </dependency>
import java.net.URI;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.ResourceBundle;

import org.java_websocket.WebSocket.READYSTATE;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;

import com.zxlhdata.framework.msg.util.log.LayIMLog;
import com.zxlhdata.framework.service.IMServiceI;

import net.sf.json.JSONArray;
import net.sf.json.JSONObject;

/**
 * 用于监听用户、组织、角色变更的业务场景的实时数据更新的类
 *  CopyRright(c)2021-xxxx:
 * Project:com.zxlhdata.framework.core.util 
 * Comments:<对此类的描述,可以引用系统设计中的描述>
 * JDKversionused:<JDK1.8> 
 * Author:何湘简 
 * CreateDate:2021年5月25日 下午3:23:27
 */
public class ZxlhWebSocketClient {
	private static final ResourceBundle bundle = java.util.ResourceBundle.getBundle("sysConfig");
	static IMServiceI iMService = ApplicationContextUtil.getContext().getBean(IMServiceI.class);
	static boolean isopen = false;// 是否连接成功!
	static boolean isNowConnect = false; //是否正在尝试连接

	public static void createConnect() throws Exception {
		//接收消息的应用id,由服务端发放,必须和服务端保持一致
		String receive_msg_app_id = bundle.getString("receive_msg_app_id");
		//服务端web_socket地址
		String web_socket_client_url = bundle.getString("web_socket_client_url");
		
		if (StringUtil.isNotEmpty(receive_msg_app_id) && StringUtil.isNotEmpty(web_socket_client_url)) {

			WebSocketClient client = new WebSocketClient(new URI(web_socket_client_url + receive_msg_app_id)) {

				@Override
				public void onOpen(ServerHandshake shake) {
					LayIMLog.error("握手...");
					for (Iterator<String> it = shake.iterateHttpFields(); it.hasNext();) {
						String key = it.next();
						LayIMLog.error(key + ":" + shake.getFieldValue(key));
					}
				}

				@Override
				public void onMessage(String paramString) {
					LayIMLog.error("接收到消息:" + paramString);
					JSONObject jsonMsg = JSONObject.fromObject(paramString);
					if(jsonMsg.containsKey("msg")) {
						JSONObject msg = jsonMsg.getJSONObject("msg");
						JSONObject dataObject = msg.getJSONObject("content");
						
						if(msg.containsKey("actionType") && msg.containsKey("tableName")) {
							if(StringUtil.isNotEmpty(msg.get("tableName"))) {
								String sql_ = "select  COLUMN_NAME,DATA_TYPE from information_schema.COLUMNS where table_name = '" + msg.get("tableName") + "' ";
								String dbName = iMService.getDBName();
								if (StringUtil.isNotEmpty(dbName)) {
									sql_ += " AND table_schema = '" + dbName + "'";
								}
								String comma = "";
								StringBuffer insertKey = new StringBuffer();
								StringBuffer insertValue = new StringBuffer();
								List<Map<String, Object>> extList = iMService.queryForList(sql_);
								if(extList!=null && extList.size()>0) {
									String idValue="";
									//新增
									if("insert".equals(msg.get("actionType"))) {
										
											for (Map<String, Object> m : extList) {
												String value="";
												if(dataObject.containsKey(m.get("COLUMN_NAME"))) {
													value = oConvertUtils.getString(dataObject.getString(m.get("COLUMN_NAME").toString()));
													insertKey.append(comma + m.get("COLUMN_NAME"));
												}
												else if(dataObject.containsKey((m.get("COLUMN_NAME") + "").toLowerCase())) {
													value = oConvertUtils.getString(dataObject.getString(m.get("COLUMN_NAME").toString().toLowerCase()));
													insertKey.append(comma + m.get("COLUMN_NAME").toString().toLowerCase());
												}
												else if(dataObject.containsKey((m.get("COLUMN_NAME") + "").toUpperCase())) {
													value = oConvertUtils.getString(dataObject.getString(m.get("COLUMN_NAME").toString().toUpperCase()));
													insertKey.append(comma + m.get("COLUMN_NAME").toString().toUpperCase());
												}
												if("int".equals(m.get("DATA_TYPE"))
														||"bigint".equals(m.get("DATA_TYPE"))
														||"float".equals(m.get("DATA_TYPE"))
														||"double".equals(m.get("DATA_TYPE"))
														||"decimal".equals(m.get("DATA_TYPE"))
														||"numeric".equals(m.get("DATA_TYPE"))) {
													insertValue.append(comma + oConvertUtils.getString(value,"null"));
												}else if("datetime".equals(m.get("DATA_TYPE"))
														||"date".equals(m.get("DATA_TYPE"))) {
													if(StringUtil.isNotEmpty(value)) {
														insertValue.append(comma + "'" +value + "'");
													}else {
														insertValue.append(comma + "null");
													}
												}
												else {
													insertValue.append(comma + "'" +value + "'");
												}
												
												comma = ",";
												if("id".equals((m.get("COLUMN_NAME") + "").toLowerCase())) {
													idValue=value;
												}
											}
											
											List<Map<String, Object>> objList = iMService.queryForList("select * from "+msg.get("tableName")+" where id='"+idValue+"'");
											if(objList!=null && objList.size()>0) {
												return ;
											}
											
											sql_ = "INSERT INTO " + msg.get("tableName") + " (" + insertKey + ") VALUES (" + insertValue + ")";
											iMService.execute(sql_);
									}
									//修改
									if("update".equals(msg.get("actionType"))) {
										
										sql_ = " update " + msg.get("tableName") + " set ";
										for (Map<String, Object> m : extList) {
											String value="",fieldName="";
											if(dataObject.containsKey(m.get("COLUMN_NAME"))) {
												value=oConvertUtils.getString(dataObject.getString(m.get("COLUMN_NAME").toString()));
												fieldName=m.get("COLUMN_NAME")+"";
												
											}
											else if(dataObject.containsKey((m.get("COLUMN_NAME") + "").toLowerCase())) {
												value=oConvertUtils.getString(dataObject.getString(m.get("COLUMN_NAME").toString().toLowerCase()));
												fieldName=(m.get("COLUMN_NAME") + "").toLowerCase()+"";
												
											}
											else if(dataObject.containsKey((m.get("COLUMN_NAME") + "").toUpperCase())) {
												value=oConvertUtils.getString(dataObject.getString(m.get("COLUMN_NAME").toString().toUpperCase()));
												fieldName=(m.get("COLUMN_NAME") + "").toUpperCase();
												
											}
											if("int".equals(m.get("DATA_TYPE"))
													||"bigint".equals(m.get("DATA_TYPE"))
													||"float".equals(m.get("DATA_TYPE"))
													||"double".equals(m.get("DATA_TYPE"))
													||"decimal".equals(m.get("DATA_TYPE"))
													||"numeric".equals(m.get("DATA_TYPE"))) {
												sql_ += comma + fieldName + "=" + oConvertUtils.getString(value,"null")+"";
											}else if("datetime".equals(m.get("DATA_TYPE"))
													||"date".equals(m.get("DATA_TYPE"))) {
												if(StringUtil.isNotEmpty(value)) {
													sql_ += comma + fieldName + "='" + value+"'";
												}else {
													sql_ += comma + fieldName + "=null";
												}
											}else {
												sql_ += comma + fieldName + "='" + value+"'";
											}
											comma = ", ";
											if("id".equals((m.get("COLUMN_NAME") + "").toLowerCase())) {
												idValue=value;
											}
										}
										List<Map<String, Object>> objList = iMService.queryForList("select * from "+msg.get("tableName")+" where id='"+idValue+"'");
										if(objList==null || objList.size()==0) {
											return ;
										}
										iMService.execute(sql_ + " where  id='"+idValue+"'");
									}
									//删除
									if("delete".equals(msg.get("actionType"))) {
										iMService.execute("delete from "+msg.get("tableName")+" where id in ('"+dataObject.getString("ids").replace(",", "','")+"') ");
									}
								}
							}
						}
					}
				}

				@Override
				public void onClose(int paramInt, String paramString, boolean paramBoolean) {
					LayIMLog.error("关闭...");
					isopen = false;
					int createConnectNum = 0;
					while (!isopen && !isNowConnect) {
						isNowConnect = true;//正在尝试连接
						createConnectNum++;
						LayIMLog.error("重新连接失败,请检查网络!");
						LayIMLog.error("正在尝试第" + createConnectNum + "次连接!");
						try {
							ZxlhWebSocketClient.createConnect();
						} catch (Exception e) {
							e.printStackTrace();
						}
						
						if(!isopen) {
							isNowConnect = false;
						}
					}

				}

				@Override
				public void onError(Exception e) {
					LayIMLog.error("异常" + e);

				}

			};
			client.connect();
			
			int a=0;
			//判断连接状态,每个连接的次数在50秒以内,如果50秒以后还没连接上就跳出,进入下一个循环
			while (!client.getReadyState().equals(READYSTATE.OPEN) && a<=10) {
				a++;
				LayIMLog.error(web_socket_client_url + receive_msg_app_id + "==正在连接。。。。。。");
				try {
					Thread.sleep(5000);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			if(client.getReadyState().equals(READYSTATE.OPEN)) {
				isopen = true;
				isNowConnect = false;
				LayIMLog.error("连接成功!");
			}
		}
	}

	/**
	 * 接收WebSocket消息
	 * 
	 * @Description TODO
	 * @return void
	 * @Author 何湘简
	 * @Date 2021年5月25日 下午2:56:51
	 */
	public static void receiveMsg() {
		// 重新启动一个线程用于监听即时的数据修改推送消息
		new Thread() {
			public void run() {
				try {
					ZxlhWebSocketClient.createConnect();
				} catch (Exception e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		}.start();
	}

	public static void main(String[] args) {
		try {
			ZxlhWebSocketClient.createConnect();
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

}