<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>com.coreos</groupId>
<artifactId>jetcd-core</artifactId>
<version>0.0.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-nop</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.5</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.5.5</version>
<configuration>
<archive>
<manifest>
<mainClass>com.xxg.Main</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
package com.gemdale.iot;
import com.coreos.jetcd.Client;
import com.coreos.jetcd.KV;
import com.coreos.jetcd.Watch;
import com.coreos.jetcd.data.ByteSequence;
import com.coreos.jetcd.data.KeyValue;
import java.util.List;
public class EtcdUtil1 {
// etcd 客户端链接
private static Client client = null;
private static String IPPORT = null;
static {IPPORT = System.getProperty("IPPORT", "http://10.34.4.47:11000");
getEtcdClient();}
// 链接初始化
public static Client getEtcdClient() {if (client == null) {synchronized (EtcdUtil1.class) {client = Client.builder().lazyInitialization(false).endpoints(IPPORT).build();}
}
return client;
}
/**
* 根据指定的配置名称获取对应的 value
*
* @param key
* 配置项
* @return
* @throws Exception
*/
public static String getEtcdValueByKey(String key) throws Exception {KeyValue kv = getEtcdKeyValueByKey(key);
if (kv != null) {return kv.getValue().toStringUtf8();} else {return null;}
}
/**
* 根据指定的配置名称获取对应的 keyvalue
*
* @param key
* 配置项
* @return
* @throws Exception
*/
public static KeyValue getEtcdKeyValueByKey(String key) throws Exception {KV kvCient = client.getKVClient();
List<KeyValue> kvs = kvCient.get(ByteSequence.fromString(key)).get().getKvs();
kvCient.close();
if (kvs.size() > 0) {return kvs.get(0);
} else {return null;}
}
/**
* 新增或者修改指定的配置
*
* @param key
* @param value
* @return
*/
public static void putEtcdValueByKey(String key, String value) throws Exception {client.getKVClient().put(ByteSequence.fromString(key), ByteSequence.fromBytes(value.getBytes("utf-8")));
}
/**
* 删除指定的配置
*
* @param key
* @return
*/
public static void deleteEtcdValueByKey(String key) {client.getKVClient().delete(ByteSequence.fromString(key));
}
// V3 api 配置初始化和监听
public void init() {
try {
// 加载配置
getConfig(client.getKVClient().get(ByteSequence.fromString("test")).get().getKvs());
// 启动监听线程
new Thread(() -> {
// 对某一个配置进行监听
Watch.Watcher watcher = client.getWatchClient().watch(ByteSequence.fromString("etcd_key"));
try {while (true) {watcher.listen().getEvents().stream().forEach(watchEvent -> {KeyValue kv = watchEvent.getKeyValue();
// 获取事件变化类型
System.out.println(watchEvent.getEventType());
// 获取发生变化的 key
System.out.println(kv.getKey().toStringUtf8());
// 获取变化后的 value
String afterChangeValue = kv.getValue().toStringUtf8();
});
}
} catch (InterruptedException e) {e.printStackTrace();
}
}).start();} catch (Exception e) {e.printStackTrace();
}
}
private String getConfig(List<KeyValue> kvs) {if (kvs.size() > 0) {String config = kvs.get(0).getValue().toStringUtf8();
System.out.println("etcd's config 's configValue is :" + config);
return config;
} else {return null;}
}
}