DHTUtils.java

package it.cnr.iit.utility.dht;

import com.google.gson.GsonBuilder;
import com.google.gson.JsonSyntaxException;
import com.google.gson.typeadapters.RuntimeTypeAdapterFactory;
import it.cnr.iit.ucs.constants.PURPOSE;
import it.cnr.iit.utility.dht.jsonvolatile.*;
import it.cnr.iit.utility.dht.jsonvolatile.addpip.AddPipRequest;
import it.cnr.iit.utility.dht.jsonvolatile.addpip.AddPipResponse;
import it.cnr.iit.utility.dht.jsonvolatile.addpolicy.AddPolicyRequest;
import it.cnr.iit.utility.dht.jsonvolatile.addpolicy.AddPolicyResponse;
import it.cnr.iit.utility.dht.jsonvolatile.deletepolicy.DeletePolicyRequest;
import it.cnr.iit.utility.dht.jsonvolatile.deletepolicy.DeletePolicyResponse;
import it.cnr.iit.utility.dht.jsonvolatile.endaccess.EndAccessRequest;
import it.cnr.iit.utility.dht.jsonvolatile.endaccess.EndAccessResponse;
import it.cnr.iit.utility.dht.jsonvolatile.error.ErrorResponse;
import it.cnr.iit.utility.dht.jsonvolatile.getpolicy.GetPolicyRequest;
import it.cnr.iit.utility.dht.jsonvolatile.getpolicy.GetPolicyResponse;
import it.cnr.iit.utility.dht.jsonvolatile.listpolicies.ListPoliciesRequest;
import it.cnr.iit.utility.dht.jsonvolatile.listpolicies.ListPoliciesResponse;
import it.cnr.iit.utility.dht.jsonvolatile.reevaluation.ReevaluationResponse;
import it.cnr.iit.utility.dht.jsonvolatile.registration.RegisterRequest;
import it.cnr.iit.utility.dht.jsonvolatile.registration.RegisterResponse;
import it.cnr.iit.utility.dht.jsonvolatile.startaccess.StartAccessRequest;
import it.cnr.iit.utility.dht.jsonvolatile.startaccess.StartAccessResponse;
import it.cnr.iit.utility.dht.jsonvolatile.tryaccess.TryAccessRequest;
import it.cnr.iit.utility.dht.jsonvolatile.tryaccess.TryAccessResponse;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.URI;

public class DHTUtils {

    private static final RuntimeTypeAdapterFactory<MessageContent> typeFactory = RuntimeTypeAdapterFactory
            .of(MessageContent.class, "purpose")
            .registerSubtype(RegisterRequest.class, PURPOSE.REGISTER.name())
            .registerSubtype(RegisterResponse.class, PURPOSE.REGISTER_RESPONSE.name())
            .registerSubtype(TryAccessRequest.class, PURPOSE.TRY.name())
            .registerSubtype(TryAccessResponse.class, PURPOSE.TRY_RESPONSE.name())
            .registerSubtype(StartAccessRequest.class, PURPOSE.START.name())
            .registerSubtype(StartAccessResponse.class, PURPOSE.START_RESPONSE.name())
            .registerSubtype(EndAccessRequest.class, PURPOSE.END.name())
            .registerSubtype(EndAccessResponse.class, PURPOSE.END_RESPONSE.name())
            .registerSubtype(ReevaluationResponse.class, PURPOSE.REEVALUATION_RESPONSE.name())
            .registerSubtype(AddPolicyRequest.class, PURPOSE.ADD_POLICY.name())
            .registerSubtype(AddPolicyResponse.class, PURPOSE.ADD_POLICY_RESPONSE.name())
            .registerSubtype(DeletePolicyRequest.class, PURPOSE.DELETE_POLICY.name())
            .registerSubtype(DeletePolicyResponse.class, PURPOSE.DELETE_POLICY_RESPONSE.name())
            .registerSubtype(ListPoliciesRequest.class, PURPOSE.LIST_POLICIES.name())
            .registerSubtype(ListPoliciesResponse.class, PURPOSE.LIST_POLICIES_RESPONSE.name())
            .registerSubtype(GetPolicyRequest.class, PURPOSE.GET_POLICY.name())
            .registerSubtype(GetPolicyResponse.class, PURPOSE.GET_POLICY_RESPONSE.name())
            .registerSubtype(AddPipRequest.class, PURPOSE.ADD_PIP.name())
            .registerSubtype(AddPipResponse.class, PURPOSE.ADD_PIP_RESPONSE.name())
            .registerSubtype(ErrorResponse.class, PURPOSE.ERROR_RESPONSE.name());


    /**
     * Build a JsonOut object
     *
     * @param message     the object containing a request or a response
     * @param id          the name of the entity communicating with the UCS
     * @param topic_name  the name of the topic
     * @param topic_uuid  the unique identifier of the topic
     * @param commandType the type of command
     * @return the object to be then serialized and sent to the DHT
     */
    public static JsonOut buildOutgoingJsonObject(MessageContent message, String id,
                                                  String topic_name, String topic_uuid,
                                                  String commandType) {
        InnerValue innerValue =
                new InnerValue(
                        message,
                        id,
                        topic_name,
                        topic_uuid);

        Command command = new Command(commandType, innerValue);

        // set timestamp
        OuterValue outerValue = new OuterValue(System.currentTimeMillis(), command);

        RequestPubMessage pubMsg = new RequestPubMessage(outerValue);

        return new JsonOut(pubMsg);
    }

    /**
     * Serialize a JsonOut object to a Json string
     * format accepted by the DHT
     *
     * @param jsonOut the object to be serialized
     * @return the Json string to send to the DHT
     */
    public static String serializeOutgoingJson(JsonOut jsonOut) {

        String outgoing = new GsonBuilder()
                .disableHtmlEscaping()
                .serializeNulls()
                .create()
                .toJson(jsonOut);

        String messageClass = jsonOut
                .getRequestPubMessage()
                .getValue()
                .getCommand()
                .getValue()
                .getMessage()
                .getClass()
                .getSimpleName();

        System.out.println("Sending " + messageClass + " message:");
        System.out.println(new GsonBuilder()
                .disableHtmlEscaping()
                .serializeNulls()
                .setPrettyPrinting()
                .create()
                .toJson(jsonOut));
        return outgoing;
    }


    /**
     * Deserialize the Json string coming from the DHT
     *
     * @param message The Json string, as received from the DHT
     * @return the object deserialized from the Json string
     */
    public static JsonIn deserializeIncomingJson(String message) {
        return new GsonBuilder()
                .registerTypeAdapterFactory(typeFactory)
                .create().fromJson(message, JsonIn.class);
    }

    /**
     * Check if the topic_uuid contained in the deserialized
     * received message is the one this entity is subscribed to,
     * as specified in the 'topic' argument
     *
     * @param jsonIn the JsonIn object, deserialized from the
     *               Json received from the DHT
     * @param topic  the topic this entity is subscribed to
     * @return true if the topic matches with the one thi
     * is subscribed to. False otherwise.
     */
    public static boolean isTopicOfInterest(JsonIn jsonIn, String topic) {
        //System.out.println("Topic does not match the one we are subscribed to: " +
        //        "Message discarded.");
        try {
            return jsonIn
                    .getVolatile()
                    .getValue()
                    .getCommand()
                    .getValue()
                    .getTopic_uuid()
                    .equals(topic);
        } catch (Exception e) {
            throw new JsonSyntaxException(e.getMessage());
        }
    }


    /**
     * Wait for a connection to the DHT before proceeding
     *
     * @param dhtWebsocketUri the URI of the WebSocket interface for the DHT
     * @return true when the connection succeeds
     */
    public static boolean isDhtReachable(String dhtWebsocketUri, int timeout, int attempts) {

        Socket socket = null;
        URI dhtUri = URI.create(dhtWebsocketUri);

        long connectionTime = System.currentTimeMillis();
        for (int i = 0; i < attempts; i++) {
            try {
                connectionTime = System.currentTimeMillis();
                socket = new Socket();
                socket.connect(new InetSocketAddress(dhtUri.getHost(), dhtUri.getPort()), timeout);
                break;
            } catch (Exception e) {
                System.err.println("Failed: Attempt " + (i + 1) + " of " + attempts + " to reach DHT at: " + dhtWebsocketUri);
                if (i == attempts - 1) {
                    return false;
                }
                try {
                    long currentTime = System.currentTimeMillis();
                    long timeLeftToSleep = connectionTime + timeout - currentTime > 0 ?
                            connectionTime + timeout - currentTime : 0L;

                    Thread.sleep(timeLeftToSleep);
                } catch (InterruptedException ex) {
                    System.err.println("Failed to sleep while waiting for the next attempt");
                }
            }
        }
        try {
            assert socket != null;
            socket.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return true;
    }
}