// vendor
import debug from 'debug'
import { isEmpty } from 'lodash'

import { Observable, Subject } from 'rxjs'

// Export
// ---------------------------------------------------------------------------

/**
 * `export frequencyBuffer :: Observable a -> Object -> Observable a`
 *
 * ```js
 * import { frequencyBuffer } from 'components/__utils/frequency'
 *
 *
 * const webSocketObservable = ...
 * const buffered = frequencyCache(webSocketObservable)
 *
 * buffered.subscribe()
 * ```
 *
 * frequencyBuffer needs to be passed a multicast Observable, or weird things
 * will result. @see: http://bit.ly/2fa6AUO
 */
export default function frequencyBuffer(source, config = {}) {
  // @TODO: Create multicast observable inside this operator, rather than
  // passing it a multicast. as simple as scoping `source` like
  // `const _source = source.share()`
  const {
    cooldownThresh = 2,
    count = 1000,
    floodThresh = 4,
    id = null,
    time = 1000,
  } = config

  const log = debug(`VO:utils/frequency${id ? `/${id}` : ''}`)

  return Observable.create(function(observer) {
    let cooldown = 0
    let flood = false
    let thresh = 0

    const _flush = new Subject()

    // Check to see if we're backpressured. If not, trigger a flush.
    const timer = setInterval(function flush() {
      if (!flood) {
        _flush.next('flushing by frequency')
      }
    }, 666)

    // Always flush once a certain count is reached. This makes sure that
    // the observable is not indefinitely backpressured.
    const _count = source.bufferCount(count).subscribe(function(el) {
      log('flushing by buffer size')

      _flush.next(el)
    })

    // Empty observable
    const _empty = source
      .bufferTime(time)
      .filter(x => isEmpty(x))
      .subscribe(function() {
        if (flood) {
          cooldown = cooldown + 1
        }

        // If observable has cooled off, then we should flush the buffered
        // observable.
        if (cooldown > cooldownThresh) {
          log('flushing by message frequency')

          cooldown = 0
          flood = false
          thresh = 0
        }
      })

    // Non-empty observable
    const _nonempty = source
      .bufferTime(time)
      .filter(x => !isEmpty(x))
      .subscribe(function() {
        if (flood) {
          return
        }

        thresh = thresh + 1

        // If there is a constant stream of elements from the observable,
        // we should start backpressuring.
        if (thresh > floodThresh) {
          log('starting backpressure')

          cooldown = 0
          flood = true
          thresh = 0
        }
      })

    const _sub = source
      .bufferWhen(function() {
        return _flush
      })
      .filter(incidents => !isEmpty(incidents))
      .subscribe(
        function onNext(el) {
          observer.next(el)
        },
        err => observer.error(err),
        () => observer.complete()
      )

    return function() {
      clearInterval(timer)

      _empty.unsubscribe()
      _nonempty.unsubscribe()
      _count.unsubscribe()
      _sub.unsubscribe()
    }
  })
}
