"""
This module contains handlers that can be used to forward logs from python logging module
to a Redis database.
"""
import logging
import pickle
import json
import redis
DEFAULT_FIELDS = [
"msg", # the log message
"levelname", # the log level
"created" # the log timestamp
]
[docs]class RedisLogHandler(logging.Handler):
"""Default class for Redis log handlers.
Attributes
----------
redis : redis.Redis
The Redis client.
batch_size : int
The batch size, if this value is > 1, logs will be processed by batches.
log_buffer : list
The list containing the batched logs.
Methods
-------
emit(record: logging.LogRecord)
This method is intended to be implemented by subclasses and so raises a NotImplementedError.
"""
[docs] def __init__(self, redis_client: redis.Redis = None, batch_size: int = 1,
check_conn: bool = True, **redis_args) -> None:
"""Init RedisLogHandler
Parameters
----------
redis_client : redis.Redis, optional
The Redis client to forward logs to, by default None.
batch_size : int, optional
The batch size, if > 1 logs will be processed by batches, by default 1.
check_conn : bool, optional
Wether to check of not if the Redis is available with a ping, by default True.
Raises
------
TypeError
Raised if one of the aditional argument passed to Redis is invalid.
ConnectionError
Raised if the Redis DB is unavailable.
"""
super().__init__()
if redis_client is not None:
self.redis = redis_client
else:
try:
self.redis = redis.Redis(**redis_args)
except TypeError as err:
raise TypeError(
"One of the argument passed to Redis is not valid") from err
if check_conn:
# trying to ping Redis DB
try:
self.redis.ping()
except redis.exceptions.ConnectionError as err:
raise ConnectionError("Unable to ping Redis DB") from err
self.batch_size = batch_size
self.log_buffer = []
[docs] def emit(self, record: logging.LogRecord) -> None:
raise NotImplementedError(
"emit must be implemented by RedisLogHandler subclasses")
def _buffer_emit(self):
raise NotImplementedError(
"_buffer_emit must be implemented by RedisLogHandler subclasses")
def _check_buff_and_emit(self):
if len(self.log_buffer) >= self.batch_size:
self._buffer_emit()
[docs] def close(self):
"""Make sure to add all remaining logs in buffer to Redis before object is destroyed."""
if self.log_buffer:
self._buffer_emit()
super().close()
[docs]class RedisStreamLogHandler(RedisLogHandler):
"""Handler used to forward logs to a Redis stream.
Attributes
----------
redis : redis.Redis
The Redis client.
batch_size : int
The batch size, if this value is > 1, logs will be processed by batches.
log_buffer : list
The list containing the batched logs.
stream_name : str
The name of the Redis stream.
fields : list(str)
The list of logs fields to forward.
as_pkl : bool
If true, the logs are written as pickle format in the stream.
as_json : bool
If true, the logs are written as JSON in the stream.
Methods
-------
emit(record: logging.LogRecord)
Forward log to the Redis stream.
Notes
-----
Redis streams: https://redis.io/docs/data-types/streams/
"""
[docs] def __init__(self, redis_client: redis.Redis = None, batch_size: int = 1,
check_conn: bool = True, stream_name: str = "logs",
maxlen: int = None, approximate: bool = True,
fields: list = None, as_pkl: bool = False, as_json: bool = False,
**redis_args) -> None:
"""Init RedisStreamLogHandler
Parameters
----------
redis_client : redis.Redis, optional
The Redis client to forward logs to, by default None.
batch_size : int, optional
The batch size, if > 1 logs will be processed by batches, by default 1.
check_conn : bool, optional
Wether to check of not if the Redis is available with a ping, by default True.
stream_name : str, optional
The name of the Redis stream where the logs are stored, by default "logs".
maxlen : int, optional
The maximum lenght of the Redis stream, if 0 no limit applied, by default 0.
approximate : bool, optional
If True, the Redis size won't be exactly equals to `maxlen`, but will be at least
`maxlen`, by default True.
fields : list, optional
The list of logs fields to save, by default None.
as_pkl : bool, optional
Wether to save the log as its pickle format or not, by default False.
as_json : bool, optional
Wether to save the log as JSON format or not, by default False.
Notes
-----
More info about Redis caped stream: https://redis.io/docs/data-types/streams-tutorial/#capped-streams
"""
super().__init__(redis_client, batch_size, check_conn, **redis_args)
self.stream_name = stream_name
self.maxlen = maxlen
self.approximate = approximate
self.as_pkl = as_pkl
self.as_json = as_json
self.fields = fields if fields is not None else DEFAULT_FIELDS
[docs] def emit(self, record: logging.LogRecord):
"""Write the log record in the Redis stream.
Every time a log is emitted, an entry is inserted in the stream.
This entry is a dict whose format depends on the handler
attributes.
If `as_pkl` is set to true, the records are saved as
their pickle format with the key "pkl". If `as_json` is set to true,
the records are saved as their JSON representation with the key "json".
Otherwise we use the different fields as keys and their associated value
in the record as the value.
If `batch_size=n`, the logs are emited by batches of size `n`.
Parameters
----------
record : logging.LogRecord
The log record to emit.
"""
stream_entry = _make_entry(record, self.fields, self.as_pkl, self.as_json)
self.log_buffer.append(stream_entry)
self._check_buff_and_emit()
def _buffer_emit(self):
"""Emits the logs batched in log buffer."""
pipe = self.redis.pipeline()
for log in self.log_buffer:
pipe.xadd(self.stream_name, log, maxlen=self.maxlen, approximate=self.approximate)
pipe.execute()
self.log_buffer = []
[docs]class RedisPubSubLogHandler(RedisLogHandler):
"""Handler used to publish logs to a Redis pub/sub channel.
Attributes
----------
redis : redis.Redis
The Redis client.
batch_size : int
The batch size, if this value is > 1, logs will be processed by batches.
log_buffer : list
The list containing the batched logs.
channel_name : str
The name of the Redis pub/sub channel.
fields : list(str)
The list of logs fields to forward.
as_pkl : bool
If true, the logs are written as pickle format in the message.
Methods
-------
emit(record: logging.LogRecord)
Publish log to the Redis pub/sub channel.
Notes
-----
Redis pub/sub: https://redis.io/docs/manual/pubsub/
"""
[docs] def __init__(self, redis_client: redis.Redis = None, batch_size: int = 1,
check_conn: bool = True, channel_name: str = "logs",
fields: list = None, as_pkl: bool = False, **redis_args) -> None:
"""Init RedisPubSubLogHandler
Parameters
----------
redis_client : redis.Redis, optional
The Redis client to forward logs to, by default None.
batch_size : int, optional
The batch size, if > 1 logs will be processed by batches, by default 1.
check_conn : bool, optional
Wether to check of not if the Redis is available with a ping, by default True.
channel_name : str, optional
The name of the Redis pub/sub channel where the logs are pushed, by default "logs".
fields : list, optional
The list of logs fields to save, by default None.
as_pkl : bool, optional
Wether to save the log as its pickle format or not, by default False.
"""
super().__init__(redis_client, batch_size, check_conn, **redis_args)
self.channel_name = channel_name
self.as_pkl = as_pkl
self.fields = fields if fields is not None else DEFAULT_FIELDS
[docs] def emit(self, record: logging.LogRecord):
"""Publish the log record in the Redis pub/sub channel.
Every time a log is emitted, an entry is published on the channel.
This entry is encoded as JSON whose format depends on the handler
attributes. If `as_pkl` is set to true, the records are saved as
their pickle format with the key "pkl". Otherwise we use the
different fields as keys and their associated value in the record
as the value (default fields are used if not specified).
Parameters
----------
record : logging.LogRecord
The log record to emit.
"""
log_entry = _make_entry(record, self.fields, self.as_pkl,
raw_pkl=True)
if self.as_pkl:
self.log_buffer.append(log_entry)
else:
self.log_buffer.append(json.dumps(log_entry))
self._check_buff_and_emit()
def _buffer_emit(self):
"""Emits the logs batched in log buffer."""
pipe = self.redis.pipeline()
for log in self.log_buffer:
pipe.publish(self.channel_name, log)
pipe.execute()
self.log_buffer = []
def _make_fields(record, fields):
"""Return the fields dict for the log record.
If all the specified fields are invalid, use the default fields.
"""
field_dict = {field: getattr(record, field)
for field in fields if hasattr(record, field)}
if field_dict == {}:
return {field: getattr(record, field)
for field in DEFAULT_FIELDS if hasattr(record, field)}
if "msg" in field_dict:
field_dict["msg"] = record.getMessage()
return field_dict
def _make_entry(record, fields, as_pkl, as_json=False, raw_pkl=False):
"""Format the log entry."""
if as_pkl:
if raw_pkl:
return pickle.dumps(record)
return {"pkl": pickle.dumps(record)}
if as_json:
return {"json": json.dumps(_make_fields(record, fields))}
return _make_fields(record, fields)