# -*- coding: utf-8 -*-
# SPDX-FileCopyrightText: 2012-2025 kaliko <kaliko@azylum.org>
# SPDX-FileCopyrightText: 2021 Wonko der Verständige <wonko@hanstool.org>
# SPDX-FileCopyrightText: 2019 Naglis Jonaitis <naglis@mailbox.org>
# SPDX-FileCopyrightText: 2019 Bart Van Loon <bbb@bbbart.be>
# SPDX-FileCopyrightText: 2008-2010 J. Alexander Treuman <jat@spatialrift.net>
# SPDX-License-Identifier: LGPL-3.0-or-later
"""Python Music Player Daemon client library"""
import logging
import os
import socket
from functools import wraps
HELLO_PREFIX = "OK MPD "
ERROR_PREFIX = "ACK "
SUCCESS = "OK"
NEXT = "list_OK"
#: Module version
VERSION = '0.9.2'
#: Seconds before a connection attempt times out
#: (overriden by :envvar:`MPD_TIMEOUT` env. var.)
CONNECTION_TIMEOUT = 30
#: Socket timeout in second > 0 (Default is :py:obj:`None` for no timeout)
SOCKET_TIMEOUT = None
log = logging.getLogger(__name__)
[docs]
def iterator_wrapper(func):
"""Decorator handling iterate option"""
@wraps(func)
def decorated_function(instance, *args, **kwargs):
generator = func(instance, *args, **kwargs)
if not instance.iterate:
return list(generator)
instance._iterating = True
def iterator(gen):
try:
for item in gen:
yield item
finally:
instance._iterating = False
return iterator(generator)
return decorated_function
[docs]
class MPDError(Exception):
"""Main musicpd Exception"""
[docs]
class ConnectionError(MPDError):
"""Fatal Connection Error, cannot recover from it."""
[docs]
class ProtocolError(MPDError):
"""Fatal Protocol Error, cannot recover from it"""
[docs]
class CommandError(MPDError):
"""Malformed command, socket should be fine, can reuse it"""
class CommandListError(MPDError):
""""""
class PendingCommandError(MPDError):
""""""
class IteratingError(MPDError):
""""""
class Range:
def __init__(self, tpl):
self.tpl = tpl
self.lower = ''
self.upper = ''
self._check()
def __str__(self):
return f'{self.lower}:{self.upper}'
def __repr__(self):
return f'Range({self.tpl})'
def _check_element(self, item):
if item is None or item == '':
return ''
try:
return str(int(item))
except (TypeError, ValueError) as err:
raise CommandError(f'Not an integer: "{item}"') from err
return item
def _check(self):
if not isinstance(self.tpl, tuple):
raise CommandError('Wrong type, provide a tuple')
if len(self.tpl) == 0:
return
if len(self.tpl) == 1:
self.lower = self._check_element(self.tpl[0])
return
if len(self.tpl) != 2:
raise CommandError('Range wrong size (0, 1 or 2 allowed)')
self.lower = self._check_element(self.tpl[0])
self.upper = self._check_element(self.tpl[1])
if self.lower == '' and self.upper != '':
raise CommandError(f'Integer expected to start the range: {self.tpl}')
if self.upper.isdigit() and self.lower.isdigit():
if int(self.lower) > int(self.upper):
raise CommandError(f'Wrong range: {self.lower} > {self.upper}')
class _NotConnected:
def __getattr__(self, attr):
return self._dummy
def _dummy(self, *args):
raise ConnectionError("Not connected")
[docs]
class MPDClient:
"""MPDClient instance will look for :envvar:`MPD_HOST`/:envvar:`MPD_PORT`/:envvar:`XDG_RUNTIME_DIR` environment
variables and set instance attribute :py:attr:`host`, :py:attr:`port` and :py:obj:`pwd`
accordingly.
Then :py:obj:`musicpd.MPDClient.connect` will use :py:obj:`host` and
:py:obj:`port` as defaults if not provided as args.
Regarding :envvar:`MPD_HOST` format to expose password refer this module
documentation or MPD client manual :manpage:`mpc (1)`.
>>> from os import environ
>>> environ['MPD_HOST'] = 'pass@mpdhost'
>>> cli = musicpd.MPDClient()
>>> cli.pwd == environ['MPD_HOST'].split('@')[0]
True
>>> cli.host == environ['MPD_HOST'].split('@')[1]
True
>>> cli.connect() # will use host/port as set in MPD_HOST/MPD_PORT
.. note::
default host:
* use :envvar:`MPD_HOST` environment variable if set, extract password if present,
* else use :envvar:`XDG_RUNTIME_DIR` to looks for an existing file in ``${XDG_RUNTIME_DIR:-/run/}/mpd/socket``
* else set host to ``localhost``
default port:
* use :envvar:`MPD_PORT` environment variable is set
* else use ``6600``
.. warning:: **Instance attribute host/port/pwd**
While :py:attr:`musicpd.MPDClient.host` and
:py:attr:`musicpd.MPDClient.port` keep track of current connection
host and port, :py:attr:`musicpd.MPDClient.pwd` is set once with
password extracted from environment variable.
Calling MPS's password method with a new password
won't update :py:attr:`musicpd.MPDClient.pwd` value.
Moreover, :py:attr:`musicpd.MPDClient.pwd` is only an helper attribute
exposing password extracted from :envvar:`MPD_HOST` environment variable, it
will not be used as default value for the MPD's password command.
"""
def __init__(self):
self.iterate = False
#: Socket timeout value in seconds
self._socket_timeout = SOCKET_TIMEOUT
#: Current connection timeout value, defaults to
#: :py:obj:`CONNECTION_TIMEOUT` or env. var. ``MPD_TIMEOUT`` if provided
self.mpd_timeout = None
self.mpd_version = ''
"""Protocol version as exposed by the server as a :py:obj:`str`
.. note:: This is the version of the protocol spoken, not the real version of the daemon."""
self._reset()
self._commands = {
# Querying MPD’s status # querying-mpd-s-status
"clearerror": self._fetch_nothing,
"currentsong": self._fetch_object,
"idle": self._fetch_list,
#"noidle": None,
"status": self._fetch_object,
"stats": self._fetch_object,
# Playback Option # playback-options
"consume": self._fetch_nothing,
"crossfade": self._fetch_nothing,
"mixrampdb": self._fetch_nothing,
"mixrampdelay": self._fetch_nothing,
"random": self._fetch_nothing,
"repeat": self._fetch_nothing,
"setvol": self._fetch_nothing,
"getvol": self._fetch_object,
"single": self._fetch_nothing,
"replay_gain_mode": self._fetch_nothing,
"replay_gain_status": self._fetch_item,
"volume": self._fetch_nothing,
# Controlling playback # controlling-playback
"next": self._fetch_nothing,
"pause": self._fetch_nothing,
"play": self._fetch_nothing,
"playid": self._fetch_nothing,
"previous": self._fetch_nothing,
"seek": self._fetch_nothing,
"seekid": self._fetch_nothing,
"seekcur": self._fetch_nothing,
"stop": self._fetch_nothing,
# The Queue # the-queue
"add": self._fetch_nothing,
"addid": self._fetch_item,
"clear": self._fetch_nothing,
"delete": self._fetch_nothing,
"deleteid": self._fetch_nothing,
"move": self._fetch_nothing,
"moveid": self._fetch_nothing,
"playlist": self._fetch_playlist,
"playlistfind": self._fetch_songs,
"playlistid": self._fetch_songs,
"playlistinfo": self._fetch_songs,
"playlistsearch": self._fetch_songs,
"plchanges": self._fetch_songs,
"plchangesposid": self._fetch_changes,
"prio": self._fetch_nothing,
"prioid": self._fetch_nothing,
"rangeid": self._fetch_nothing,
"shuffle": self._fetch_nothing,
"swap": self._fetch_nothing,
"swapid": self._fetch_nothing,
"addtagid": self._fetch_nothing,
"cleartagid": self._fetch_nothing,
# Stored playlists # stored-playlists
"listplaylist": self._fetch_list,
"listplaylistinfo": self._fetch_songs,
"searchplaylist": self._fetch_songs,
"listplaylists": self._fetch_playlists,
"load": self._fetch_nothing,
"playlistadd": self._fetch_nothing,
"playlistclear": self._fetch_nothing,
"playlistdelete": self._fetch_nothing,
"playlistlength": self._fetch_object,
"playlistmove": self._fetch_nothing,
"rename": self._fetch_nothing,
"rm": self._fetch_nothing,
"save": self._fetch_nothing,
# The music database # the-music-database
"albumart": self._fetch_composite,
"count": self._fetch_object,
"getfingerprint": self._fetch_object,
"find": self._fetch_songs,
"findadd": self._fetch_nothing,
"list": self._fetch_list,
"listall": self._fetch_database,
"listallinfo": self._fetch_database,
"listfiles": self._fetch_database,
"lsinfo": self._fetch_database,
"readcomments": self._fetch_object,
"readpicture": self._fetch_composite,
"search": self._fetch_songs,
"searchadd": self._fetch_nothing,
"searchaddpl": self._fetch_nothing,
"searchcount": self._fetch_object,
"update": self._fetch_item,
"rescan": self._fetch_item,
# Mounts and neighbors # mounts-and-neighbors
"mount": self._fetch_nothing,
"unmount": self._fetch_nothing,
"listmounts": self._fetch_mounts,
"listneighbors": self._fetch_neighbors,
# Stickers # stickers
"sticker get": self._fetch_item,
"sticker set": self._fetch_nothing,
"sticker delete": self._fetch_nothing,
"sticker list": self._fetch_list,
"sticker find": self._fetch_songs,
"stickernames": self._fetch_list,
"stickertypes": self._fetch_list,
"stickernamestypes": self._fetch_list,
# Connection settings # connection-settings
"close": None,
"kill": None,
"password": self._fetch_nothing,
"ping": self._fetch_nothing,
"binarylimit": self._fetch_nothing,
"tagtypes": self._fetch_list,
"tagtypes disable": self._fetch_nothing,
"tagtypes enable": self._fetch_nothing,
"tagtypes clear": self._fetch_nothing,
"tagtypes all": self._fetch_nothing,
"protocol": self._fetch_list,
"protocol disable": self._fetch_nothing,
"protocol enable": self._fetch_nothing,
"protocol clear": self._fetch_nothing,
"protocol all": self._fetch_nothing,
"protocol available": self._fetch_list,
# Partition Commands # partition-commands
"partition": self._fetch_nothing,
"listpartitions": self._fetch_list,
"newpartition": self._fetch_nothing,
"delpartition": self._fetch_nothing,
"moveoutput": self._fetch_nothing,
# Audio output devices # audio-output-devices
"disableoutput": self._fetch_nothing,
"enableoutput": self._fetch_nothing,
"toggleoutput": self._fetch_nothing,
"outputs": self._fetch_outputs,
"outputset": self._fetch_nothing,
# Reflection # reflection
"config": self._fetch_object,
"commands": self._fetch_list,
"notcommands": self._fetch_list,
"urlhandlers": self._fetch_list,
"decoders": self._fetch_plugins,
# Client to Client # client-to-client
"subscribe": self._fetch_nothing,
"unsubscribe": self._fetch_nothing,
"channels": self._fetch_list,
"readmessages": self._fetch_messages,
"sendmessage": self._fetch_nothing,
}
#: host used with the current connection (:py:obj:`str`)
self.host = None
#: password detected in :envvar:`MPD_HOST` environment variable (:py:obj:`str`)
self.pwd = None
#: port used with the current connection (:py:obj:`int`, :py:obj:`str`)
self.port = None
self._get_envvars()
def _get_envvars(self):
"""
Retrieve MPD env. var. to overrides default "localhost:6600"
"""
# Set some defaults
self.host = 'localhost'
self.port = os.getenv('MPD_PORT', '6600')
_host = os.getenv('MPD_HOST', '')
if _host:
# If password is set: MPD_HOST=pass@host
if '@' in _host:
mpd_host_env = _host.split('@', 1)
if mpd_host_env[0]:
# A password is actually set
log.debug('password detected in MPD_HOST, set client pwd attribute')
self.pwd = mpd_host_env[0]
if mpd_host_env[1]:
self.host = mpd_host_env[1]
log.debug('host detected in MPD_HOST: %s', self.host)
elif mpd_host_env[1]:
# No password set but leading @ is an abstract socket
self.host = '@'+mpd_host_env[1]
log.debug('host detected in MPD_HOST: %s (abstract socket)', self.host)
else:
# MPD_HOST is a plain host
self.host = _host
log.debug('host detected in MPD_HOST: %s', self.host)
else:
# Is socket there
xdg_runtime_dir = os.getenv('XDG_RUNTIME_DIR', '/run')
rundir = os.path.join(xdg_runtime_dir, 'mpd/socket')
if os.path.exists(rundir):
self.host = rundir
log.debug('host detected in ${XDG_RUNTIME_DIR}/run: %s (unix socket)', self.host)
_mpd_timeout = os.getenv('MPD_TIMEOUT', '')
if _mpd_timeout.isdigit():
self.mpd_timeout = int(_mpd_timeout)
log.debug('timeout detected in MPD_TIMEOUT: %d', self.mpd_timeout)
else: # Use CONNECTION_TIMEOUT as default even if MPD_TIMEOUT carries gargage
self.mpd_timeout = CONNECTION_TIMEOUT
def __getattr__(self, attr):
if attr == 'send_noidle': # have send_noidle to cancel idle as well as noidle
return self.noidle
if attr.startswith("send_"):
command = attr.replace("send_", "", 1)
wrapper = self._send
elif attr.startswith("fetch_"):
command = attr.replace("fetch_", "", 1)
wrapper = self._fetch
else:
command = attr
wrapper = self._execute
if command not in self._commands:
command = command.replace("_", " ")
if command not in self._commands:
cls = self.__class__.__name__
raise AttributeError(f"'{cls}' object has no attribute '{attr}'")
return lambda *args: wrapper(command, args)
def _send(self, command, args):
if self._command_list is not None:
raise CommandListError("Cannot use send_%s in a command list" %
command.replace(" ", "_"))
self._write_command(command, args)
retval = self._commands[command]
if retval is not None:
self._pending.append(command)
def _fetch(self, command, args=None): # pylint: disable=unused-argument
cmd_fmt = command.replace(" ", "_")
if self._command_list is not None:
raise CommandListError(f"Cannot use fetch_{cmd_fmt} in a command list")
if self._iterating:
raise IteratingError(f"Cannot use fetch_{cmd_fmt} while iterating")
if not self._pending:
raise PendingCommandError("No pending commands to fetch")
if self._pending[0] != command:
raise PendingCommandError(f"'{command}' is not the currently pending command")
del self._pending[0]
retval = self._commands[command]
if callable(retval):
return retval()
return retval
def _execute(self, command, args): # pylint: disable=unused-argument
if self._iterating:
raise IteratingError(f"Cannot execute '{command}' while iterating")
if self._pending:
raise PendingCommandError(f"Cannot execute '{command}' with pending commands")
retval = self._commands[command]
if self._command_list is not None:
if not callable(retval):
raise CommandListError(f"'{command}' not allowed in command list")
self._write_command(command, args)
self._command_list.append(retval)
else:
self._write_command(command, args)
if callable(retval):
return retval()
return retval
return None
def _write_line(self, line):
self._wfile.write(f"{line!s}\n")
self._wfile.flush()
def _write_command(self, command, args=None):
if args is None:
args = []
parts = [command]
for arg in args:
if isinstance(arg, tuple):
parts.append(f'{Range(arg)!s}')
else:
parts.append(f'"{escape(str(arg))}"')
if '\n' in ' '.join(parts):
raise CommandError('new line found in the command!')
self._write_line(" ".join(parts))
def _read_binary(self, amount):
chunk = bytearray()
while amount > 0:
result = self._rbfile.read(amount)
if len(result) == 0:
self.disconnect()
raise ConnectionError("Connection lost while reading binary content")
chunk.extend(result)
amount -= len(result)
return bytes(chunk)
def _read_line(self, binary=False):
if binary:
line = self._rbfile.readline().decode('utf-8')
else:
line = self._rfile.readline()
if not line.endswith("\n"):
self.disconnect()
raise ConnectionError("Connection lost while reading line")
line = line.rstrip("\n")
if line.startswith(ERROR_PREFIX):
error = line[len(ERROR_PREFIX):].strip()
raise CommandError(error)
if self._command_list is not None:
if line == NEXT:
return None
if line == SUCCESS:
raise ProtocolError(f"Got unexpected '{SUCCESS}'")
elif line == SUCCESS:
return None
return line
def _read_pair(self, separator, binary=False):
line = self._read_line(binary=binary)
if line is None:
return None
pair = line.split(separator, 1)
if len(pair) < 2:
raise ProtocolError(f"Could not parse pair: '{line}'")
return pair
def _read_pairs(self, separator=": ", binary=False):
pair = self._read_pair(separator, binary=binary)
while pair:
yield pair
pair = self._read_pair(separator, binary=binary)
def _read_list(self):
seen = None
for key, value in self._read_pairs():
if key != seen:
if seen is not None:
raise ProtocolError(f"Expected key '{seen}', got '{key}'")
seen = key
yield value
def _read_playlist(self):
for _, value in self._read_pairs(":"):
yield value
def _read_objects(self, delimiters=None):
obj = {}
if delimiters is None:
delimiters = []
for key, value in self._read_pairs():
key = key.lower()
if obj:
if key in delimiters:
yield obj
obj = {}
elif key in obj:
if not isinstance(obj[key], list):
obj[key] = [obj[key], value]
else:
obj[key].append(value)
continue
obj[key] = value
if obj:
yield obj
def _read_command_list(self):
try:
for retval in self._command_list:
yield retval()
finally:
self._command_list = None
self._fetch_nothing()
def _fetch_nothing(self):
line = self._read_line()
if line is not None:
raise ProtocolError(f"Got unexpected return value: '{line}'")
def _fetch_item(self):
pairs = list(self._read_pairs())
if len(pairs) != 1:
return None
return pairs[0][1]
@iterator_wrapper
def _fetch_list(self):
return self._read_list()
@iterator_wrapper
def _fetch_playlist(self):
return self._read_playlist()
def _fetch_object(self):
objs = list(self._read_objects())
if not objs:
return {}
return objs[0]
@iterator_wrapper
def _fetch_objects(self, delimiters):
return self._read_objects(delimiters)
def _fetch_changes(self):
return self._fetch_objects(["cpos"])
def _fetch_songs(self):
return self._fetch_objects(["file"])
def _fetch_playlists(self):
return self._fetch_objects(["playlist"])
def _fetch_database(self):
return self._fetch_objects(["file", "directory", "playlist"])
def _fetch_outputs(self):
return self._fetch_objects(["outputid"])
def _fetch_plugins(self):
return self._fetch_objects(["plugin"])
def _fetch_messages(self):
return self._fetch_objects(["channel"])
def _fetch_mounts(self):
return self._fetch_objects(["mount"])
def _fetch_neighbors(self):
return self._fetch_objects(["neighbor"])
def _fetch_composite(self):
obj = {}
for key, value in self._read_pairs(binary=True):
key = key.lower()
obj[key] = value
if key == 'binary':
break
if not obj:
# If the song file was recognized, but there is no picture, the
# response is successful, but is otherwise empty.
return obj
amount = int(obj['binary'])
try:
obj['data'] = self._read_binary(amount)
except IOError as err:
raise ConnectionError(f'Error reading binary content: {err}') from err
data_bytes = len(obj['data'])
if data_bytes != amount: # can we ever get there?
raise ConnectionError('Error reading binary content: '
f'Expects {amount}B, got {data_bytes}')
# Fetches trailing new line
self._read_line(binary=True)
# Fetches SUCCESS code
self._read_line(binary=True)
return obj
@iterator_wrapper
def _fetch_command_list(self):
return self._read_command_list()
def _hello(self):
line = self._rfile.readline()
if not line.endswith("\n"):
raise ConnectionError("Connection lost while reading MPD hello")
line = line.rstrip("\n")
if not line.startswith(HELLO_PREFIX):
raise ProtocolError(f"Got invalid MPD hello: '{line}'")
self.mpd_version = line[len(HELLO_PREFIX):].strip()
def _reset(self):
self.mpd_version = ''
self._iterating = False
self._pending = []
self._command_list = None
self._sock = None
self._rfile = _NotConnected()
self._rbfile = _NotConnected()
self._wfile = _NotConnected()
def _connect_unix(self, path):
if not hasattr(socket, "AF_UNIX"):
raise ConnectionError("Unix domain sockets not supported on this platform")
# abstract socket
if path.startswith('@'):
path = '\0'+path[1:]
try:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.settimeout(self.mpd_timeout)
sock.connect(path)
sock.settimeout(self.socket_timeout)
except socket.error as socket_err:
raise ConnectionError(socket_err) from socket_err
return sock
def _connect_tcp(self, host, port):
try:
flags = socket.AI_ADDRCONFIG
except AttributeError:
flags = 0
err = None
try:
gai = socket.getaddrinfo(host, port, socket.AF_UNSPEC,
socket.SOCK_STREAM, socket.IPPROTO_TCP,
flags)
except socket.error as gaierr:
raise ConnectionError(gaierr) from gaierr
for res in gai:
af, socktype, proto, _, sa = res
sock = None
try:
log.debug('opening socket %s', sa)
sock = socket.socket(af, socktype, proto)
sock.settimeout(self.mpd_timeout)
sock.connect(sa)
sock.settimeout(self.socket_timeout)
return sock
except socket.error as socket_err:
log.debug('opening socket %s failed: %s', sa, socket_err)
err = socket_err
if sock is not None:
sock.close()
if err is not None:
raise ConnectionError(err)
raise ConnectionError("getaddrinfo returns an empty list")
def noidle(self):
# noidle's special case
if not self._pending or self._pending[0] != 'idle':
raise CommandError('cannot send noidle if send_idle was not called')
del self._pending[0]
self._write_command("noidle")
return self._fetch_list()
[docs]
def connect(self, host=None, port=None):
"""Connects the MPD server
:param str host: hostname, IP or FQDN (defaults to *localhost* or socket)
:param port: port number (defaults to *6600*)
:type port: str or int
If host/port are :py:obj:`None` the socket uses :py:attr:`host`/:py:attr:`port`
attributes as defaults. Cf. :py:obj:`MPDClient` for the logic behind default host/port.
The underlying socket also honors :envvar:`MPD_TIMEOUT` environment variable
and defaults to :py:obj:`musicpd.CONNECTION_TIMEOUT` (connect command only).
If you want to have a timeout for each command once you got connected,
set its value in :py:obj:`MPDClient.socket_timeout` (in second) or at
module level in :py:obj:`musicpd.SOCKET_TIMEOUT`.
"""
if not host:
host = self.host
else:
self.host = host
if not port:
port = self.port
else:
self.port = port
if self._sock is not None:
raise ConnectionError("Already connected")
if host[0] in ['/', '@']:
log.debug('Connecting unix socket %s', host)
self._sock = self._connect_unix(host)
else:
log.debug('Connecting tcp socket %s:%s (timeout: %ss)', host, port, self.mpd_timeout)
self._sock = self._connect_tcp(host, port)
self._rfile = self._sock.makefile("r", encoding='utf-8', errors='surrogateescape')
self._rbfile = self._sock.makefile("rb")
self._wfile = self._sock.makefile("w", encoding='utf-8')
try:
self._hello()
except:
self.disconnect()
raise
log.debug('Connected')
@property
def socket_timeout(self):
"""Socket timeout in second (defaults to :py:obj:`SOCKET_TIMEOUT`).
Use :py:obj:`None` to disable socket timout.
:setter: Set the socket timeout (integer > 0)
:type: int or None
"""
return self._socket_timeout
@socket_timeout.setter
def socket_timeout(self, timeout):
if timeout is not None:
if int(timeout) <= 0:
raise ValueError('socket_timeout expects a non zero positive integer')
self._socket_timeout = int(timeout)
else:
self._socket_timeout = timeout
if getattr(self._sock, 'settimeout', False):
self._sock.settimeout(self._socket_timeout)
[docs]
def disconnect(self):
"""Closes the MPD connection.
The client closes the actual socket, it does not use the
'close' request from MPD protocol (as suggested in documentation).
"""
if hasattr(self._rfile, 'close'):
self._rfile.close()
if hasattr(self._rbfile, 'close'):
self._rbfile.close()
if hasattr(self._wfile, 'close'):
self._wfile.close()
if hasattr(self._sock, 'close'):
self._sock.close()
self._reset()
def __enter__(self):
self.connect()
return self
def __exit__(self, exception_type, exception_value, exception_traceback):
self.disconnect()
[docs]
def fileno(self):
"""Return the socket’s file descriptor (a small integer).
This is useful with :py:obj:`select.select`.
"""
if self._sock is None:
raise ConnectionError("Not connected")
return self._sock.fileno()
def command_list_ok_begin(self):
if self._command_list is not None:
raise CommandListError("Already in command list")
if self._iterating:
raise IteratingError("Cannot begin command list while iterating")
if self._pending:
raise PendingCommandError("Cannot begin command list with pending commands")
self._write_command("command_list_ok_begin")
self._command_list = []
def command_list_end(self):
if self._command_list is None:
raise CommandListError("Not in command list")
if self._iterating:
raise IteratingError("Already iterating over a command list")
self._write_command("command_list_end")
return self._fetch_command_list()
def escape(text):
return text.replace("\\", "\\\\").replace('"', '\\"')
# vim: set expandtab shiftwidth=4 softtabstop=4 textwidth=79: