DHTPersistentMessageClient.java
package it.cnr.iit.utility.dht;
import com.google.gson.GsonBuilder;
import com.google.gson.typeadapters.RuntimeTypeAdapterFactory;
import it.cnr.iit.utility.dht.jsonpersistent.*;
import jakarta.websocket.*;
import java.net.URI;
@ClientEndpoint
public class DHTPersistentMessageClient {
private final Object lock = new Object();
private String response;
private Session session;
private final String uri;
private String topicName;
private String topicUuid;
/**
* Type adapter factory which has to be initialized with the right class before
* calling the constructor. The class registered at this type adapter factory
* must implement RequestPostTopicUuid.
*/
private RuntimeTypeAdapterFactory<RequestPostTopicUuid> typeFactory;
/**
* Type adapter factory which has to be initialized with the right class before
* calling the constructor. The class registered at this type adapter factory
* must implement Persistent.
*/
private RuntimeTypeAdapterFactory<Persistent> persistentTypeFactory;
public DHTPersistentMessageClient(String uri) {
this.uri = uri;
}
public DHTPersistentMessageClient(String uri, String topicName, String topicUuid,
RuntimeTypeAdapterFactory<RequestPostTopicUuid> typeFactory,
RuntimeTypeAdapterFactory<Persistent> persistentTypeFactory) {
if (topicName == null || uri == null) {
throw new RuntimeException("Topic name and URI cannot be null");
}
this.uri = uri;
this.topicName = topicName;
this.topicUuid = topicUuid;
this.typeFactory = typeFactory;
this.persistentTypeFactory = persistentTypeFactory;
}
@OnOpen
public void onOpen(Session session) {
this.session = session; // Store the session for later use
// System.out.println("WebSocket connected");
}
/**
* Try to deserialize the message as a JsonInResponse2RequestGetTopicUuid. If this
* fails, return false.
* Otherwise, extract the topicName and topicUuid. If they both match, return true.
* @param message the json string
* @return true if, after deserialization, topicName and topicUuid match the local
* topicName and topicUuid fields. False otherwise.
*/
private boolean isExpectedResponse2RequestGetTopicUuid(String message) {
if (this.topicUuid == null) {
return false;
}
try {
JsonInResponse2RequestGetTopicUuid jsonInResponse = new GsonBuilder()
.registerTypeAdapterFactory(typeFactory)
.create()
.fromJson(message, JsonInResponse2RequestGetTopicUuid.class);
String topicName = jsonInResponse.getResponse().getValue().getTopic_name();
String topicUuid = jsonInResponse.getResponse().getValue().getTopic_uuid();
return this.topicName.equals(topicName) && this.topicUuid.equals(topicUuid);
} catch (Exception e) {
return false;
}
}
/**
* Try to deserialize the message as a JsonInResponse2RequestGetTopicName. If this
* fails, return false.
* Otherwise, extract the topicName. If topicName matches, return true.
* @param message the json string
* @return true if, after deserialization, topicName matches the local topicName.
* False otherwise.
*/
private boolean isExpectedResponse2RequestGetTopicName(String message) {
try {
JsonInResponse2RequestGetTopicName jsonInResponse = new GsonBuilder()
.registerTypeAdapterFactory(typeFactory)
.create()
.fromJson(message, JsonInResponse2RequestGetTopicName.class);
String topicName = jsonInResponse.getResponse().getValue().get(0).getTopic_name();
return this.topicName.equals(topicName);
} catch (Exception e) {
return false;
}
}
/**
* Try to deserialize the message as a jsonInPersistent. If this fails, return false.
* Otherwise, extract the topicName and topicUuid. If they both match, return true.
* @param message the json string
* @return true if, after deserialization, topicName and topicUuid match the local
* topicName and topicUuid fields. False otherwise.
*/
private boolean isExpectedPersistent(String message) {
if (this.topicUuid == null) {
return false;
}
try {
JsonInPersistent jsonInPersistent = new GsonBuilder()
.registerTypeAdapterFactory(persistentTypeFactory)
.create()
.fromJson(message, JsonInPersistent.class);
String topicName = jsonInPersistent.getPersistent().getTopic_name();
String topicUuid = jsonInPersistent.getPersistent().getTopic_uuid();
return this.topicName.equals(topicName) && this.topicUuid.equals(topicUuid);
} catch (Exception e) {
return false;
}
}
/**
* This is an empty message, and it could be the right answer
* to our request. This happens when no messages for the topic
* name and topic uuid are found in the DHT.
* However, we cannot know with certainty that this is the answer
* to our request.
* Therefore, to minimize false positives, we return this string
* to the caller, which should make another request equal to the
* previous one. If, for a number of attempts, the caller receives
* the empty response string, it should assume that the DHT
* contains no records for the given topic name and uuid.
* @param message the json string
* @return true if the input matches the empty response, false otherwise.
*/
private boolean isEmptyResponse(String message) {
return message.equals("{\"Response\":{\"value\":{}}}") || message.equals("{\"Response\":{\"value\":[]}}");
}
// todo (?): implement a mechanism that calls the lock.notify() after
// receiving too many "wrong" messages
@OnMessage
public void onMessage(String message, Session session) {
String truncatedMessage;
if (message.length() > 100) {
truncatedMessage = message.substring(0,99) + "... (remainder omitted for readability)";
} else {
truncatedMessage = message;
}
if (!(isExpectedResponse2RequestGetTopicUuid(message)
|| isExpectedResponse2RequestGetTopicName(message)
|| isExpectedPersistent(message))
&& !isEmptyResponse(message)) {
// discard message
// System.out.println("DHTPersistentMessageClient: message discarded: " + truncatedMessage);
return;
}
else {
System.out.print("DHTPersistentMessageClient: " +
"Received response for topicName: " + topicName);
if (topicUuid != null) {
System.out.println(", topicUuid: " + topicUuid);
} else {
System.out.println();
}
}
System.out.println("Received response: " + truncatedMessage);
response = message;
synchronized (lock) {
lock.notify(); // Notify the waiting thread that the response has been received.
}
}
@OnClose
public void onClose(Session session) {
// System.out.println("Session " + session.getId() + " closed.");
}
public void closeConnection() {
try {
if (session != null) {
session.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
public String sendRequestAndWaitForResponse(String request) {
try {
WebSocketContainer container = ContainerProvider.getWebSocketContainer();
container.connectToServer(this, new URI(this.uri));
synchronized (lock) {
this.session.getBasicRemote().sendText(request); // Send the request using the stored session
lock.wait(); // Wait for the response to be received
}
return response;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
public String getTopicName() {
return topicName;
}
public void setTopicName(String topicName) {
this.topicName = topicName;
}
public String getTopicUuid() {
return topicUuid;
}
public void setTopicUuid(String topicUuid) {
this.topicUuid = topicUuid;
}
}