# Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
#
# Use of this source code is governed by a BSD-style license
# that can be found in the LICENSE file in the root of the source
# tree. An additional intellectual property rights grant can be found
# in the file PATENTS.  All contributing project authors may
# be found in the AUTHORS file in the root of the source tree.
"""Evaluation score abstract class and implementations.
"""

from __future__ import division
import logging
import os
import re
import subprocess
import sys

try:
    import numpy as np
except ImportError:
    logging.critical('Cannot import the third-party Python package numpy')
    sys.exit(1)

from . import data_access
from . import exceptions
from . import signal_processing


class EvaluationScore(object):

    NAME = None
    REGISTERED_CLASSES = {}

    def __init__(self, score_filename_prefix):
        self._score_filename_prefix = score_filename_prefix
        self._input_signal_metadata = None
        self._reference_signal = None
        self._reference_signal_filepath = None
        self._tested_signal = None
        self._tested_signal_filepath = None
        self._output_filepath = None
        self._score = None
        self._render_signal_filepath = None

    @classmethod
    def RegisterClass(cls, class_to_register):
        """Registers an EvaluationScore implementation.

    Decorator to automatically register the classes that extend EvaluationScore.
    Example usage:

    @EvaluationScore.RegisterClass
    class AudioLevelScore(EvaluationScore):
      pass
    """
        cls.REGISTERED_CLASSES[class_to_register.NAME] = class_to_register
        return class_to_register

    @property
    def output_filepath(self):
        return self._output_filepath

    @property
    def score(self):
        return self._score

    def SetInputSignalMetadata(self, metadata):
        """Sets input signal metadata.

    Args:
      metadata: dict instance.
    """
        self._input_signal_metadata = metadata

    def SetReferenceSignalFilepath(self, filepath):
        """Sets the path to the audio track used as reference signal.

    Args:
      filepath: path to the reference audio track.
    """
        self._reference_signal_filepath = filepath

    def SetTestedSignalFilepath(self, filepath):
        """Sets the path to the audio track used as test signal.

    Args:
      filepath: path to the test audio track.
    """
        self._tested_signal_filepath = filepath

    def SetRenderSignalFilepath(self, filepath):
        """Sets the path to the audio track used as render signal.

    Args:
      filepath: path to the test audio track.
    """
        self._render_signal_filepath = filepath

    def Run(self, output_path):
        """Extracts the score for the set test data pair.

    Args:
      output_path: path to the directory where the output is written.
    """
        self._output_filepath = os.path.join(
            output_path, self._score_filename_prefix + self.NAME + '.txt')
        try:
            # If the score has already been computed, load.
            self._LoadScore()
            logging.debug('score found and loaded')
        except IOError:
            # Compute the score.
            logging.debug('score not found, compute')
            self._Run(output_path)

    def _Run(self, output_path):
        # Abstract method.
        raise NotImplementedError()

    def _LoadReferenceSignal(self):
        assert self._reference_signal_filepath is not None
        self._reference_signal = signal_processing.SignalProcessingUtils.LoadWav(
            self._reference_signal_filepath)

    def _LoadTestedSignal(self):
        assert self._tested_signal_filepath is not None
        self._tested_signal = signal_processing.SignalProcessingUtils.LoadWav(
            self._tested_signal_filepath)

    def _LoadScore(self):
        return data_access.ScoreFile.Load(self._output_filepath)

    def _SaveScore(self):
        return data_access.ScoreFile.Save(self._output_filepath, self._score)


@EvaluationScore.RegisterClass
class AudioLevelPeakScore(EvaluationScore):
    """Peak audio level score.

  Defined as the difference between the peak audio level of the tested and
  the reference signals.

  Unit: dB
  Ideal: 0 dB
  Worst case: +/-inf dB
  """

    NAME = 'audio_level_peak'

    def __init__(self, score_filename_prefix):
        EvaluationScore.__init__(self, score_filename_prefix)

    def _Run(self, output_path):
        self._LoadReferenceSignal()
        self._LoadTestedSignal()
        self._score = self._tested_signal.dBFS - self._reference_signal.dBFS
        self._SaveScore()


@EvaluationScore.RegisterClass
class MeanAudioLevelScore(EvaluationScore):
    """Mean audio level score.

  Defined as the difference between the mean audio level of the tested and
  the reference signals.

  Unit: dB
  Ideal: 0 dB
  Worst case: +/-inf dB
  """

    NAME = 'audio_level_mean'

    def __init__(self, score_filename_prefix):
        EvaluationScore.__init__(self, score_filename_prefix)

    def _Run(self, output_path):
        self._LoadReferenceSignal()
        self._LoadTestedSignal()

        dbfs_diffs_sum = 0.0
        seconds = min(len(self._tested_signal), len(
            self._reference_signal)) // 1000
        for t in range(seconds):
            t0 = t * seconds
            t1 = t0 + seconds
            dbfs_diffs_sum += (self._tested_signal[t0:t1].dBFS -
                               self._reference_signal[t0:t1].dBFS)
        self._score = dbfs_diffs_sum / float(seconds)
        self._SaveScore()


@EvaluationScore.RegisterClass
class EchoMetric(EvaluationScore):
    """Echo score.

  Proportion of detected echo.

  Unit: ratio
  Ideal: 0
  Worst case: 1
  """

    NAME = 'echo_metric'

    def __init__(self, score_filename_prefix, echo_detector_bin_filepath):
        EvaluationScore.__init__(self, score_filename_prefix)

        # POLQA binary file path.
        self._echo_detector_bin_filepath = echo_detector_bin_filepath
        if not os.path.exists(self._echo_detector_bin_filepath):
            logging.error('cannot find EchoMetric tool binary file')
            raise exceptions.FileNotFoundError()

        self._echo_detector_bin_path, _ = os.path.split(
            self._echo_detector_bin_filepath)

    def _Run(self, output_path):
        echo_detector_out_filepath = os.path.join(output_path,
                                                  'echo_detector.out')
        if os.path.exists(echo_detector_out_filepath):
            os.unlink(echo_detector_out_filepath)

        logging.debug("Render signal filepath: %s",
                      self._render_signal_filepath)
        if not os.path.exists(self._render_signal_filepath):
            logging.error(
                "Render input required for evaluating the echo metric.")

        args = [
            self._echo_detector_bin_filepath, '--output_file',
            echo_detector_out_filepath, '--', '-i',
            self._tested_signal_filepath, '-ri', self._render_signal_filepath
        ]
        logging.debug(' '.join(args))
        subprocess.call(args, cwd=self._echo_detector_bin_path)

        # Parse Echo detector tool output and extract the score.
        self._score = self._ParseOutputFile(echo_detector_out_filepath)
        self._SaveScore()

    @classmethod
    def _ParseOutputFile(cls, echo_metric_file_path):
        """
    Parses the POLQA tool output formatted as a table ('-t' option).

    Args:
      polqa_out_filepath: path to the POLQA tool output file.

    Returns:
      The score as a number in [0, 1].
    """
        with open(echo_metric_file_path) as f:
            return float(f.read())


@EvaluationScore.RegisterClass
class PolqaScore(EvaluationScore):
    """POLQA score.

  See http://www.polqa.info/.

  Unit: MOS
  Ideal: 4.5
  Worst case: 1.0
  """

    NAME = 'polqa'

    def __init__(self, score_filename_prefix, polqa_bin_filepath):
        EvaluationScore.__init__(self, score_filename_prefix)

        # POLQA binary file path.
        self._polqa_bin_filepath = polqa_bin_filepath
        if not os.path.exists(self._polqa_bin_filepath):
            logging.error('cannot find POLQA tool binary file')
            raise exceptions.FileNotFoundError()

        # Path to the POLQA directory with binary and license files.
        self._polqa_tool_path, _ = os.path.split(self._polqa_bin_filepath)

    def _Run(self, output_path):
        polqa_out_filepath = os.path.join(output_path, 'polqa.out')
        if os.path.exists(polqa_out_filepath):
            os.unlink(polqa_out_filepath)

        args = [
            self._polqa_bin_filepath,
            '-t',
            '-q',
            '-Overwrite',
            '-Ref',
            self._reference_signal_filepath,
            '-Test',
            self._tested_signal_filepath,
            '-LC',
            'NB',
            '-Out',
            polqa_out_filepath,
        ]
        logging.debug(' '.join(args))
        subprocess.call(args, cwd=self._polqa_tool_path)

        # Parse POLQA tool output and extract the score.
        polqa_output = self._ParseOutputFile(polqa_out_filepath)
        self._score = float(polqa_output['PolqaScore'])

        self._SaveScore()

    @classmethod
    def _ParseOutputFile(cls, polqa_out_filepath):
        """
    Parses the POLQA tool output formatted as a table ('-t' option).

    Args:
      polqa_out_filepath: path to the POLQA tool output file.

    Returns:
      A dict.
    """
        data = []
        with open(polqa_out_filepath) as f:
            for line in f:
                line = line.strip()
                if len(line) == 0 or line.startswith('*'):
                    # Ignore comments.
                    continue
                # Read fields.
                data.append(re.split(r'\t+', line))

        # Two rows expected (header and values).
        assert len(data) == 2, 'Cannot parse POLQA output'
        number_of_fields = len(data[0])
        assert number_of_fields == len(data[1])

        # Build and return a dictionary with field names (header) as keys and the
        # corresponding field values as values.
        return {
            data[0][index]: data[1][index]
            for index in range(number_of_fields)
        }


@EvaluationScore.RegisterClass
class TotalHarmonicDistorsionScore(EvaluationScore):
    """Total harmonic distorsion plus noise score.

  Total harmonic distorsion plus noise score.
  See "https://en.wikipedia.org/wiki/Total_harmonic_distortion#THD.2BN".

  Unit: -.
  Ideal: 0.
  Worst case: +inf
  """

    NAME = 'thd'

    def __init__(self, score_filename_prefix):
        EvaluationScore.__init__(self, score_filename_prefix)
        self._input_frequency = None

    def _Run(self, output_path):
        self._CheckInputSignal()

        self._LoadTestedSignal()
        if self._tested_signal.channels != 1:
            raise exceptions.EvaluationScoreException(
                'unsupported number of channels')
        samples = signal_processing.SignalProcessingUtils.AudioSegmentToRawData(
            self._tested_signal)

        # Init.
        num_samples = len(samples)
        duration = len(self._tested_signal) / 1000.0
        scaling = 2.0 / num_samples
        max_freq = self._tested_signal.frame_rate / 2
        f0_freq = float(self._input_frequency)
        t = np.linspace(0, duration, num_samples)

        # Analyze harmonics.
        b_terms = []
        n = 1
        while f0_freq * n < max_freq:
            x_n = np.sum(
                samples * np.sin(2.0 * np.pi * n * f0_freq * t)) * scaling
            y_n = np.sum(
                samples * np.cos(2.0 * np.pi * n * f0_freq * t)) * scaling
            b_terms.append(np.sqrt(x_n**2 + y_n**2))
            n += 1

        output_without_fundamental = samples - b_terms[0] * np.sin(
            2.0 * np.pi * f0_freq * t)
        distortion_and_noise = np.sqrt(
            np.sum(output_without_fundamental**2) * np.pi * scaling)

        # TODO(alessiob): Fix or remove if not needed.
        # thd = np.sqrt(np.sum(b_terms[1:]**2)) / b_terms[0]

        # TODO(alessiob): Check the range of `thd_plus_noise` and update the class
        # docstring above if accordingly.
        thd_plus_noise = distortion_and_noise / b_terms[0]

        self._score = thd_plus_noise
        self._SaveScore()

    def _CheckInputSignal(self):
        # Check input signal and get properties.
        try:
            if self._input_signal_metadata['signal'] != 'pure_tone':
                raise exceptions.EvaluationScoreException(
                    'The THD score requires a pure tone as input signal')
            self._input_frequency = self._input_signal_metadata['frequency']
            if self._input_signal_metadata[
                    'test_data_gen_name'] != 'identity' or (
                        self._input_signal_metadata['test_data_gen_config'] !=
                        'default'):
                raise exceptions.EvaluationScoreException(
                    'The THD score cannot be used with any test data generator other '
                    'than "identity"')
        except TypeError:
            raise exceptions.EvaluationScoreException(
                'The THD score requires an input signal with associated metadata'
            )
        except KeyError:
            raise exceptions.EvaluationScoreException(
                'Invalid input signal metadata to compute the THD score')
