"""
This module implements the functionality to calculate the phonon density of
states (PDOS) from MD trajectories. For parsing output files into a format
which is used here, see parse.py and test/* for examples. For a theory
overview, see [1]_.
Other codes which do that are [2]_ and [3]_.
.. [1] doc/source/written/background/phonon_dos.rst
.. [2] tfreq from Tim Teatro
http://www.timteatro.net/2010/09/29/velocity-autocorrelation-and-vibrational-spectrum-calculation
.. [3] fourier.x from CPMD
"""
import os, warnings
import numpy as np
from scipy.fftpack import fft
from pwtools import _flib, num
from pwtools.verbose import verbose
from pwtools.signal import pad_zeros, welch, mirror
[docs]
def pyvacf(vel, m=None, method=3):
"""Reference implementation for calculating the VACF of velocities in 3d
array `vel`. This is slow. Use for debugging only. For production, use
fvacf().
Parameters
----------
vel : 3d array, (nstep, natoms, 3)
Atomic velocities.
m : 1d array (natoms,)
Atomic masses.
method : int
| 1 : 3 loops
| 2 : replace 1 inner loop
| 3 : replace 2 inner loops
Returns
-------
c : 1d array (nstep,)
VACF
"""
natoms = vel.shape[1]
nstep = vel.shape[0]
c = np.zeros((nstep,), dtype=float)
if m is None:
m = np.ones((natoms,), dtype=float)
if method == 1:
# c(t) = <v(t0) v(t0 + t)> / <v(t0)**2> = C(t) / C(0)
#
# "displacements" `t'
for t in range(nstep):
# time origins t0 == j
for j in range(nstep-t):
for i in range(natoms):
c[t] += np.dot(vel[j,i,:], vel[j+t,i,:]) * m[i]
elif method == 2:
# replace 1 inner loop
for t in range(nstep):
for j in range(nstep-t):
# (natoms, 3) * (natoms, 1) -> (natoms, 3)
c[t] += (vel[j,...] * vel[j+t,...] * m[:,None]).sum()
elif method == 3:
# replace 2 inner loops:
# (xx, natoms, 3) * (1, natoms, 1) -> (xx, natoms, 3)
for t in range(nstep):
c[t] = (vel[:(nstep-t),...] * vel[t:,...]*m[None,:,None]).sum()
else:
raise ValueError('unknown method: %s' %method)
# normalize to unity
c = c / c[0]
return c
[docs]
def fvacf(vel, m=None, method=2, nthreads=None):
"""Interface to Fortran function _flib.vacf(). Otherwise same
functionallity as pyvacf(). Use this for production calculations.
Parameters
----------
vel : 3d array, (nstep, natoms, 3)
Atomic velocities.
m : 1d array (natoms,)
Atomic masses.
method : int
| 1 : loops
| 2 : vectorized loops
nthreads : int ot None
If int, then use this many OpenMP threads in the Fortran extension.
Only useful if the extension was compiled with OpenMP support, of
course.
Returns
-------
c : 1d array (nstep,)
VACF
Notes
-----
Fortran extension::
$ python -c "import _flib; print(_flib.vacf.__doc__)"
vacf - Function signature:
c = vacf(v,m,c,method,use_m,[nthreads,natoms,nstep])
Required arguments:
v : input rank-3 array('d') with bounds (natoms,3,nstep)
m : input rank-1 array('d') with bounds (natoms)
c : input rank-1 array('d') with bounds (nstep)
method : input int
use_m : input int
Optional arguments:
nthreads : input int
natoms := shape(v,0) input int
nstep := shape(v,2) input int
Return objects:
c : rank-1 array('d') with bounds (nstep)
Shape of `vel`: The old array shapes were (natoms, 3, nstep), the new is
(nstep,natoms,3). B/c we don't want to adapt flib.f90, we change
vel's shape before passing it to the extension.
See Also
--------
:mod:`pwtools._flib`
:func:`vacf_pdos`
"""
# f2py copies and C-order vs. Fortran-order arrays
# ------------------------------------------------
# With vel = np.asarray(vel, order='F'), we convert vel to F-order and a
# copy is made by numpy. If we don't do it, the f2py wrapper code does.
# This copy is unavoidable, unless we allocate the array vel in F-order in
# the first place.
# c = _flib.vacf(np.asarray(vel, order='F'), m, c, method, use_m)
#
# speed
# -----
# The most costly step is calculating the VACF. FFTing that is only the fft
# of a 1d-array which is fast, even if the length is not a power of two.
# Padding is not needed.
#
natoms = vel.shape[1]
nstep = vel.shape[0]
assert vel.shape[-1] == 3, ("last dim of vel must be 3: (nstep,natoms,3)")
# `c` as "intent(in, out)" could be "intent(out), allocatable" or so,
# makes extension more pythonic, don't pass `c` in, let be allocated on
# Fortran side
c = np.zeros((nstep,), dtype=float)
if m is None:
# dummy
m = np.empty((natoms,), dtype=float)
use_m = 0
else:
use_m = 1
verbose("calling _flib.vacf ...")
if nthreads is None:
# Possible f2py bug workaround: The f2py extension does not always set
# the number of threads correctly according to OMP_NUM_THREADS. Catch
# OMP_NUM_THREADS here and set number of threads using the "nthreads"
# arg.
key = 'OMP_NUM_THREADS'
if key in os.environ:
nthreads = int(os.environ[key])
c = _flib.vacf(vel, m, c, method, use_m, nthreads)
else:
c = _flib.vacf(vel, m, c, method, use_m)
else:
c = _flib.vacf(vel, m, c, method, use_m, nthreads)
verbose("... ready")
return c
[docs]
def pdos(vel, dt=1.0, m=None, full_out=False, area=1.0, window=True,
npad=None, tonext=False, mirr=False, method='direct'):
"""Phonon DOS by FFT of the VACF or direct FFT of atomic velocities.
Integral area is normalized to `area`. It is possible (and recommended) to
zero-padd the velocities (see `npad`).
Parameters
----------
vel : 3d array (nstep, natoms, 3)
atomic velocities
dt : time step
m : 1d array (natoms,),
atomic mass array, if None then mass=1.0 for all atoms is used
full_out : bool
area : float
normalize area under frequency-PDOS curve to this value
window : bool
use Welch windowing on data before FFT (reduces leaking effect,
recommended)
npad : {None, int}
method='direct' only: Length of zero padding along `axis`. `npad=None`
= no padding, `npad > 0` = pad by a length of ``(nstep-1)*npad``. `npad
> 5` usually results in sufficient interpolation.
tonext : bool
method='direct' only: Pad `vel` with zeros along `axis` up to the next
power of two after the array length determined by `npad`. This gives
you speed, but variable (better) frequency resolution.
mirr : bool
method='vacf' only: mirror one-sided VACF at t=0 before fft
Returns
-------
if full_out = False
| ``(faxis, pdos)``
| faxis : 1d array [1/unit(dt)]
| pdos : 1d array, the phonon DOS, normalized to `area`
if full_out = True
| if method == 'direct':
| ``(faxis, pdos, (full_faxis, full_pdos, split_idx))``
| if method == 'vavcf':
| ``(faxis, pdos, (full_faxis, full_pdos, split_idx, vacf, fft_vacf))``
| fft_vacf : 1d complex array, result of fft(vacf) or fft(mirror(vacf))
| vacf : 1d array, the VACF
Examples
--------
>>> from pwtools.constants import fs,rcm_to_Hz
>>> tr = Trajectory(...)
>>> # freq in [Hz] if timestep in [s]
>>> freq,dos = pdos(tr.velocity, m=tr.mass, dt=tr.timestep*fs,
>>> method='direct', npad=1)
>>> # frequency in [1/cm]
>>> plot(freq/rcm_to_Hz, dos)
Notes
-----
padding (only method='direct'): With `npad` we pad the velocities `vel`
with ``npad*(nstep-1)`` zeros along `axis` (the time axis) before FFT
b/c the signal is not periodic. For `npad=1`, this gives us the exact
same spectrum and frequency resolution as with ``pdos(...,
method='vacf',mirr=True)`` b/c the array to be fft'ed has length
``2*nstep-1`` along the time axis in both cases (remember that the
array length = length of the time axis influences the freq.
resolution). FFT is only fast for arrays with length = a power of two.
Therefore, you may get very different fft speeds depending on whether
``2*nstep-1`` is a power of two or not (in most cases it won't). Try
using `tonext` but remember that you get another (better) frequency
resolution.
References
----------
[1] Phys Rev B 47(9) 4863, 1993
See Also
--------
:func:`pwtools.signal.fftsample`
:func:`pwtools.signal.acorr`
:func:`direct_pdos`
:func:`vacf_pdos`
"""
mass = m
# assume vel.shape = (nstep,natoms,3)
axis = 0
assert vel.shape[-1] == 3
if mass is not None:
assert len(mass) == vel.shape[1], "len(mass) != vel.shape[1]"
# define here b/c may be used twice below
mass_bc = mass[None,:,None]
if window:
sl = [None]*vel.ndim
sl[axis] = slice(None) # ':'
vel2 = vel*(welch(vel.shape[axis])[tuple(sl)])
else:
vel2 = vel
# handle options which are mutually exclusive
if method == 'vacf':
assert npad in [0,None], "use npad={0,None} for method='vacf'"
# padding
if npad is not None:
nadd = (vel2.shape[axis]-1)*npad
if tonext:
vel2 = pad_zeros(vel2, tonext=True,
tonext_min=vel2.shape[axis] + nadd,
axis=axis)
else:
vel2 = pad_zeros(vel2, tonext=False, nadd=nadd, axis=axis)
if method == 'direct':
full_fft_vel = np.abs(fft(vel2, axis=axis))**2.0
full_faxis = np.fft.fftfreq(vel2.shape[axis], dt)
split_idx = len(full_faxis)//2
faxis = full_faxis[:split_idx]
# First split the array, then multiply by `mass` and average. If
# full_out, then we need full_fft_vel below, so copy before slicing.
arr = full_fft_vel.copy() if full_out else full_fft_vel
fft_vel = num.slicetake(arr, slice(0, split_idx), axis=axis, copy=False)
if mass is not None:
fft_vel *= mass_bc
# average remaining axes, summing is enough b/c normalization is done below
# sums: (nstep, natoms, 3) -> (nstep, natoms) -> (nstep,)
pdos = num.sum(fft_vel, axis=axis, keepdims=True)
default_out = (faxis, num.norm_int(pdos, faxis, area=area))
if full_out:
# have to re-calculate this here b/c we never calculate the full_pdos
# normally
if mass is not None:
full_fft_vel *= mass_bc
full_pdos = num.sum(full_fft_vel, axis=axis, keepdims=True)
extra_out = (full_faxis, full_pdos, split_idx)
return default_out + extra_out
else:
return default_out
elif method == 'vacf':
vacf = fvacf(vel2, m=mass)
if mirr:
fft_vacf = fft(mirror(vacf))
else:
fft_vacf = fft(vacf)
full_faxis = np.fft.fftfreq(fft_vacf.shape[axis], dt)
full_pdos = np.abs(fft_vacf)
split_idx = len(full_faxis)//2
faxis = full_faxis[:split_idx]
pdos = full_pdos[:split_idx]
default_out = (faxis, num.norm_int(pdos, faxis, area=area))
extra_out = (full_faxis, full_pdos, split_idx, vacf, fft_vacf)
if full_out:
return default_out + extra_out
else:
return default_out
[docs]
def vacf_pdos(vel, *args, **kwds):
"""Wrapper for ``pdos(..., method='vacf', mirr=True, npad=None)``"""
if 'mirr' not in kwds:
kwds['mirr'] = True
return pdos(vel, *args, method='vacf', npad=None, **kwds)
[docs]
def direct_pdos(vel, *args, **kwds):
"""Wrapper for ``pdos(..., method='direct', npad=1)``"""
if 'npad' not in kwds:
kwds['npad'] = 1
if 'pad_tonext' in kwds:
## warnings.simplefilter('always')
warnings.warn("'pad_tonext' was renamed 'tonext'",
DeprecationWarning)
kwds['tonext'] = kwds['pad_tonext']
kwds.pop('pad_tonext')
return pdos(vel, *args, method='direct', **kwds)