import {signUrl} from './sign-url';
import {connect} from 'async-mqtt';
import {EventEmitter} from 'events';

export default class extends EventEmitter {
  #awsIoTEndpoint;
  #clientIdGenerator;
  #onMessage;
  #credentials;
  #urlSigner;
  #socketFactory;
  #reconnectPeriodMS;
  #mqttClient;
  #subscribedTo = new Set();
  #connected = false;
  #disconnectEvents = ['close', 'disconnect', 'offline', 'error'];
  #onDisconnection;

  constructor(
    awsIoTEndpoint,
    clientIdGenerator,
    onMessage,
    credentials,
    urlSigner = signUrl,
    socketFactory = connect,
    reconnectPeriodMS = 2_000,
  ) {
    super();
    this.#awsIoTEndpoint = awsIoTEndpoint;
    this.#clientIdGenerator = clientIdGenerator;
    this.#onMessage = onMessage;
    this.#credentials = credentials;
    this.#urlSigner = urlSigner;
    this.#socketFactory = socketFactory;
    this.#reconnectPeriodMS = reconnectPeriodMS;
    this.#onDisconnection = () => {
      // Prevent this callback from being called multiple times
      this.#disconnectEvents.forEach(e =>
        this.#mqttClient.removeListener(e, this.#onDisconnection),
      );
      this.#connected = false;
      this._connect();
    };
    this._connect();
  }

  async waitForConnection() {
    if (!this.#connected) {
      await new Promise(resolve => this.once('connect', resolve));
    }
  }

  async subscribe(topic) {
    await this.waitForConnection();
    this.#subscribedTo.add(topic);
    // We do not need to set up proper error handler
    // as we have handler set up inside _connect
    await this.#mqttClient.subscribe(topic, {qos: 1}).catch(console.error);
  }

  async unsubscribe(topic) {
    await this.waitForConnection();
    this.#subscribedTo.delete(topic);
    // We do not need to set up proper error handler
    // as we have handler set up inside _connect
    await this.#mqttClient.unsubscribe(topic).catch(console.error);
  }

  async _connect() {
    // Each call to _connect is subject to 2s delay
    await new Promise(resolve => setTimeout(resolve, this.#reconnectPeriodMS));
    await this.#mqttClient?.end();
    await this.#credentials.refreshPromise();
    const ret = new Promise(resolve => this.once('connect', resolve));
    this._recreateClient();
    await ret;
  }

  _recreateClient() {
    const requestUrl = this.#urlSigner(this.#awsIoTEndpoint, this.#credentials);
    this.#mqttClient = this.#socketFactory(requestUrl, {
      clientId: this.#clientIdGenerator(),
      reconnectPeriod: 0,
    });

    this.#disconnectEvents.forEach(event =>
      this.#mqttClient.once(event, this.#onDisconnection),
    );

    this.#mqttClient.once('connect', async () => {
      // Client connected. Subscribe to topics we are interested in
      try {
        await Promise.all(
          [...this.#subscribedTo].map(topic =>
            this.#mqttClient.subscribe(topic, {qos: 1}),
          ),
        );
      } catch (err) {
        // Again, we have proper error handling in place already
        console.error(err);
        return;
      }
      // Client sucessfully connected and subscribed to all events requested
      // Set up handlers and resolve promises
      this.#mqttClient.on('message', this.#onMessage);
      this.#connected = true;
      this.emit('connect');
    });
  }
}
