下载jar
<dependency> <groupId>org.java-websocket</groupId> <artifactId>Java-WebSocket</artifactId> <version>1.3.5</version> </dependency>
import java.net.URI; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.ResourceBundle; import org.java_websocket.WebSocket.READYSTATE; import org.java_websocket.client.WebSocketClient; import org.java_websocket.handshake.ServerHandshake; import com.zxlhdata.framework.msg.util.log.LayIMLog; import com.zxlhdata.framework.service.IMServiceI; import net.sf.json.JSONArray; import net.sf.json.JSONObject; /** * 用于监听用户、组织、角色变更的业务场景的实时数据更新的类 * CopyRright(c)2021-xxxx: * Project:com.zxlhdata.framework.core.util * Comments:<对此类的描述,可以引用系统设计中的描述> * JDKversionused:<JDK1.8> * Author:何湘简 * CreateDate:2021年5月25日 下午3:23:27 */ public class ZxlhWebSocketClient { private static final ResourceBundle bundle = java.util.ResourceBundle.getBundle("sysConfig"); static IMServiceI iMService = ApplicationContextUtil.getContext().getBean(IMServiceI.class); static boolean isopen = false;// 是否连接成功! static boolean isNowConnect = false; //是否正在尝试连接 public static void createConnect() throws Exception { //接收消息的应用id,由服务端发放,必须和服务端保持一致 String receive_msg_app_id = bundle.getString("receive_msg_app_id"); //服务端web_socket地址 String web_socket_client_url = bundle.getString("web_socket_client_url"); if (StringUtil.isNotEmpty(receive_msg_app_id) && StringUtil.isNotEmpty(web_socket_client_url)) { WebSocketClient client = new WebSocketClient(new URI(web_socket_client_url + receive_msg_app_id)) { @Override public void onOpen(ServerHandshake shake) { LayIMLog.error("握手..."); for (Iterator<String> it = shake.iterateHttpFields(); it.hasNext();) { String key = it.next(); LayIMLog.error(key + ":" + shake.getFieldValue(key)); } } @Override public void onMessage(String paramString) { LayIMLog.error("接收到消息:" + paramString); JSONObject jsonMsg = JSONObject.fromObject(paramString); if(jsonMsg.containsKey("msg")) { JSONObject msg = jsonMsg.getJSONObject("msg"); JSONObject dataObject = msg.getJSONObject("content"); if(msg.containsKey("actionType") && msg.containsKey("tableName")) { if(StringUtil.isNotEmpty(msg.get("tableName"))) { String sql_ = "select COLUMN_NAME,DATA_TYPE from information_schema.COLUMNS where table_name = '" + msg.get("tableName") + "' "; String dbName = iMService.getDBName(); if (StringUtil.isNotEmpty(dbName)) { sql_ += " AND table_schema = '" + dbName + "'"; } String comma = ""; StringBuffer insertKey = new StringBuffer(); StringBuffer insertValue = new StringBuffer(); List<Map<String, Object>> extList = iMService.queryForList(sql_); if(extList!=null && extList.size()>0) { String idValue=""; //新增 if("insert".equals(msg.get("actionType"))) { for (Map<String, Object> m : extList) { String value=""; if(dataObject.containsKey(m.get("COLUMN_NAME"))) { value = oConvertUtils.getString(dataObject.getString(m.get("COLUMN_NAME").toString())); insertKey.append(comma + m.get("COLUMN_NAME")); } else if(dataObject.containsKey((m.get("COLUMN_NAME") + "").toLowerCase())) { value = oConvertUtils.getString(dataObject.getString(m.get("COLUMN_NAME").toString().toLowerCase())); insertKey.append(comma + m.get("COLUMN_NAME").toString().toLowerCase()); } else if(dataObject.containsKey((m.get("COLUMN_NAME") + "").toUpperCase())) { value = oConvertUtils.getString(dataObject.getString(m.get("COLUMN_NAME").toString().toUpperCase())); insertKey.append(comma + m.get("COLUMN_NAME").toString().toUpperCase()); } if("int".equals(m.get("DATA_TYPE")) ||"bigint".equals(m.get("DATA_TYPE")) ||"float".equals(m.get("DATA_TYPE")) ||"double".equals(m.get("DATA_TYPE")) ||"decimal".equals(m.get("DATA_TYPE")) ||"numeric".equals(m.get("DATA_TYPE"))) { insertValue.append(comma + oConvertUtils.getString(value,"null")); }else if("datetime".equals(m.get("DATA_TYPE")) ||"date".equals(m.get("DATA_TYPE"))) { if(StringUtil.isNotEmpty(value)) { insertValue.append(comma + "'" +value + "'"); }else { insertValue.append(comma + "null"); } } else { insertValue.append(comma + "'" +value + "'"); } comma = ","; if("id".equals((m.get("COLUMN_NAME") + "").toLowerCase())) { idValue=value; } } List<Map<String, Object>> objList = iMService.queryForList("select * from "+msg.get("tableName")+" where id='"+idValue+"'"); if(objList!=null && objList.size()>0) { return ; } sql_ = "INSERT INTO " + msg.get("tableName") + " (" + insertKey + ") VALUES (" + insertValue + ")"; iMService.execute(sql_); } //修改 if("update".equals(msg.get("actionType"))) { sql_ = " update " + msg.get("tableName") + " set "; for (Map<String, Object> m : extList) { String value="",fieldName=""; if(dataObject.containsKey(m.get("COLUMN_NAME"))) { value=oConvertUtils.getString(dataObject.getString(m.get("COLUMN_NAME").toString())); fieldName=m.get("COLUMN_NAME")+""; } else if(dataObject.containsKey((m.get("COLUMN_NAME") + "").toLowerCase())) { value=oConvertUtils.getString(dataObject.getString(m.get("COLUMN_NAME").toString().toLowerCase())); fieldName=(m.get("COLUMN_NAME") + "").toLowerCase()+""; } else if(dataObject.containsKey((m.get("COLUMN_NAME") + "").toUpperCase())) { value=oConvertUtils.getString(dataObject.getString(m.get("COLUMN_NAME").toString().toUpperCase())); fieldName=(m.get("COLUMN_NAME") + "").toUpperCase(); } if("int".equals(m.get("DATA_TYPE")) ||"bigint".equals(m.get("DATA_TYPE")) ||"float".equals(m.get("DATA_TYPE")) ||"double".equals(m.get("DATA_TYPE")) ||"decimal".equals(m.get("DATA_TYPE")) ||"numeric".equals(m.get("DATA_TYPE"))) { sql_ += comma + fieldName + "=" + oConvertUtils.getString(value,"null")+""; }else if("datetime".equals(m.get("DATA_TYPE")) ||"date".equals(m.get("DATA_TYPE"))) { if(StringUtil.isNotEmpty(value)) { sql_ += comma + fieldName + "='" + value+"'"; }else { sql_ += comma + fieldName + "=null"; } }else { sql_ += comma + fieldName + "='" + value+"'"; } comma = ", "; if("id".equals((m.get("COLUMN_NAME") + "").toLowerCase())) { idValue=value; } } List<Map<String, Object>> objList = iMService.queryForList("select * from "+msg.get("tableName")+" where id='"+idValue+"'"); if(objList==null || objList.size()==0) { return ; } iMService.execute(sql_ + " where id='"+idValue+"'"); } //删除 if("delete".equals(msg.get("actionType"))) { iMService.execute("delete from "+msg.get("tableName")+" where id in ('"+dataObject.getString("ids").replace(",", "','")+"') "); } } } } } } @Override public void onClose(int paramInt, String paramString, boolean paramBoolean) { LayIMLog.error("关闭..."); isopen = false; int createConnectNum = 0; while (!isopen && !isNowConnect) { isNowConnect = true;//正在尝试连接 createConnectNum++; LayIMLog.error("重新连接失败,请检查网络!"); LayIMLog.error("正在尝试第" + createConnectNum + "次连接!"); try { ZxlhWebSocketClient.createConnect(); } catch (Exception e) { e.printStackTrace(); } if(!isopen) { isNowConnect = false; } } } @Override public void onError(Exception e) { LayIMLog.error("异常" + e); } }; client.connect(); int a=0; //判断连接状态,每个连接的次数在50秒以内,如果50秒以后还没连接上就跳出,进入下一个循环 while (!client.getReadyState().equals(READYSTATE.OPEN) && a<=10) { a++; LayIMLog.error(web_socket_client_url + receive_msg_app_id + "==正在连接。。。。。。"); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } if(client.getReadyState().equals(READYSTATE.OPEN)) { isopen = true; isNowConnect = false; LayIMLog.error("连接成功!"); } } } /** * 接收WebSocket消息 * * @Description TODO * @return void * @Author 何湘简 * @Date 2021年5月25日 下午2:56:51 */ public static void receiveMsg() { // 重新启动一个线程用于监听即时的数据修改推送消息 new Thread() { public void run() { try { ZxlhWebSocketClient.createConnect(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }.start(); } public static void main(String[] args) { try { ZxlhWebSocketClient.createConnect(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
本文由 admin 创作,采用 知识共享署名4.0
国际许可协议进行许可
本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名
最后编辑时间为:2022-08-09 22:39:32