DHTClient.java
package it.cnr.iit.utility.dht;
import jakarta.websocket.*;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import static it.cnr.iit.utility.dht.DHTUtils.isDhtReachable;
@ClientEndpoint
public class DHTClient {
Session session = null;
private MessageHandler handler;
private boolean isLoggingEnabled;
private WebSocketContainer container;
private URI wsURI;
private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
{
try {
wsURI = new URI("ws://localhost:3000/ws");
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
}
public DHTClient(URI endpointURI) {
this.wsURI = endpointURI;
connect(wsURI);
}
public DHTClient() {
connect(wsURI);
}
private void connect(URI endpointURI) throws RuntimeException {
try {
container = ContainerProvider.getWebSocketContainer();
session = container.connectToServer(this, endpointURI);
} catch (Exception e) {
//throw new RuntimeException("Unable to connect to " + wsURI.toString() + ". " + e.getMessage());
System.err.println("Unable to connect to " + endpointURI.toString());
scheduleReconnect();
}
isLoggingEnabled = true;
}
@OnOpen
public void onOpen(Session session) {
this.session = session;
System.out.println("Established websocket connection at " + wsURI);
System.out.println();
}
@OnError
public void onError(Session session, Throwable throwable) {
System.err.println("There was an error for session: " + session.getId());
handler.handleError();
}
public void addMessageHandler(MessageHandler msgHandler) {
this.handler = msgHandler;
}
@OnMessage
public void onMessage(String message) {
handler.handleMessage(message);
//System.out.println("Received message in client: " + message);
}
@OnClose
public void onClose(Session session, CloseReason closeReason) {
System.out.println("WebSocket connection closed: " + closeReason.getReasonPhrase());
scheduleReconnect();
}
private void scheduleReconnect() {
executor.schedule(() -> {
if (!session.isOpen()) {
System.out.println("WebSocket reconnecting...");
connect(wsURI);
}
}, 5, TimeUnit.SECONDS);
}
/**
* Sends a logging message to the DHT
*
*/
public boolean sendMessage(String jsonOut) {
// Return if DHT logging is not used
if (!isLoggingEnabled) {
return false;
}
// If a connection is not established yet (which should
// have been done from the application), do it now
if (container == null || session == null) {
try {
connect(wsURI);
} catch (RuntimeException e) {
return false;
}
}
// Now send the payload to the DHT
try {
session.getBasicRemote().sendText(jsonOut);
} catch (IOException e) {
System.err.println("Error: Sending logging payload to DHT failed");
Logger.getLogger(DHTClient.class.getName()).log(Level.SEVERE, null, e);
e.printStackTrace();
return false;
}
return true;
}
public interface MessageHandler {
void handleMessage(String message);
void handleError();
}
}