A Coding Implementation on Loguru for Designing Robust, Structured, Concurrent, and Production-Ready Python Logging Pipelines
AI
This tutorial demonstrates advanced Python logging with Loguru, covering structured logging, custom levels, async sinks, multiprocessing safety, and standard library interception.
Intelligence Insights
The Big Picture
The article provides a comprehensive, hands-on guide to building production-ready logging pipelines using the Loguru library in Python. It walks through setting up an idempotent logging configuration with multiple handlers, including console, memory, JSON, and error-specific sinks. Key features demonstrated include custom log levels, contextual logging via bind() and contextualize(), exception handling with @logger.catch, and opt() for lazy evaluation and inline colors. The tutorial also covers advanced file management with custom rotation, compression, and retention policies, as well as thread-safe and async logging using ThreadPoolExecutor and coroutine sinks. Additionally, it shows how to intercept Python's standard logging module to route third-party library logs through Loguru with source-aware filtering. The article concludes with performance benchmarks comparing synchronous and enqueued logging, and a series of self-tests to verify all features work correctly.
Why It Matters
This tutorial shows how to build production-grade logging with Loguru, which is critical for debugging, monitoring, and observability in Python applications. By covering structured logging, concurrency, and standard library interception, it addresses real-world needs for reliability and performance in distributed systems. The practical implementation helps developers avoid common pitfalls like handler duplication and thread safety issues, making it easier to maintain robust logging pipelines.
Deepen your understanding
Use our AI to break down complex signals.
Select an AI action to generate more depth.
In this tutorial, we implement a practical use case withLoguru, a powerful, flexible, and production-ready logging library for Python. We start by building a clean, idempotent logging setup that can be safely rerun without duplicating handlers or producing messy output. From there, we move step by step through structured logging, contextual logging, custom log levels, global patching, callable formatters, and in-memory sinks. We also handle real-world logging needs such as rich exception traces, JSON log files, custom rotation, compression, retention, async logging, threaded execution, multiprocessing-safe logging, and standard logging module interception. By keeping everything in a Colab-ready workflow, we make it easy to test, inspect, and understand how Loguru can support debugging, monitoring, and observability in serious Python applications.
!pip install -q loguru nest_asyncio
import os, sys, time, json, glob, gzip, shutil, asyncio, logging, itertools, multiprocessing
from collections import deque
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from loguru import logger
try:
import nest_asyncio; nest_asyncio.apply()
except Exception as e:
print("nest_asyncio not applied:", e)
WORKDIR = "/content/loguru_demo" if os.path.isdir("/content") else "/tmp/loguru_demo"
os.makedirs(WORKDIR, exist_ok=True); os.chdir(WORKDIR)
for f in glob.glob("*"):
try: os.remove(f)
except OSError: pass
print(f"Working directory: {WORKDIR}\n")
RESULTS = []
def check(name, condition, detail=""):
ok = bool(condition); RESULTS.append((name, ok))
print(f" [{'PASS' if ok else 'FAIL'}] {name}" + (f" — {detail}" if detail else ""))
def banner(t): print(f"\n{'='*64}\n {t}\n{'='*64}")
_seq = itertools.count(1)
def global_patcher(record):
record["extra"].setdefault("env", "colab")
record["extra"]["seq"] = next(_seq)
_NOISE = {"env", "seq", "app"}
def console_formatter(record):
fmt = ("<green>{time:HH:mm:ss.SSS}</green> | <level>{level: <8}</level> | "
"<cyan>{name}:{function}:{line}</cyan> - <level>{message}</level>")
if any(k not in _NOISE for k in record["extra"]):
fmt += " | <yellow>{extra}</yellow>"
return fmt + "\n{exception}"
We install Loguru and supporting dependencies, import all required libraries, and prepare a clean working directory for the tutorial. We also create a small verification helper to test each feature as the tutorial runs. We then define a global patcher and console formatter so that every log record carries useful metadata and appears in a readable format.
class MemorySink:
def __init__(self, capacity=2000): self.buffer = deque(maxlen=capacity)
def write(self, message): self.buffer.append(message.record)
def flush(self): pass
def has_level(self, name): return any(r["level"].name == name for r in self.buffer)
def find(self, pred): return [r for r in self.buffer if pred(r)]
MAX_BYTES = 1500
def size_rotation(message, file):
return file.tell() + len(message) > MAX_BYTES
def gzip_compression(filepath):
with open(filepath, "rb") as fi, gzip.open(filepath + ".gz", "wb") as fo:
shutil.copyfileobj(fi, fo)
os.remove(filepath)
def keep_latest_retention(files):
for old in sorted(files, key=os.path.getmtime, reverse=True)[3:]:
try: os.remove(old)
except OSError: pass
class InterceptHandler(logging.Handler):
def emit(self, record):
try: level = logger.level(record.levelname).name
except ValueError: level = record.levelno
frame, depth = logging.currentframe(), 2
while frame and frame.f_code.co_filename == logging.__file__:
frame, depth = frame.f_back, depth + 1
(logger.opt(depth=depth, exception=record.exc_info)
.bind(stdlib_logger=record.name)
.log(level, record.getMessage()))
def mp_worker(n):
logger.bind(child=os.getpid()).info("hello from child item {}", n)
return os.getpid()
We create reusable logging components that make the tutorial more practical and production-like. We define an in-memory sink, custom file rotation, compression, and retention functions to control how logs are stored. We also built a standard logging interceptor and a multiprocessing worker to connect Loguru to external libraries and child processes.
We configure Loguru with multiple handlers, including console output, memory capture, JSON logging, and error logging. We then demonstrate structured logging with bound context, contextual blocks, patched records, and a custom log level. We also explore exception handling and useful opt() features such as lazy evaluation, inline colors, and record access.
banner("5) custom rotation/compression/retention (forces real rotation)")
ev_id = logger.add("events_{time:HHmmss_SSS}.log",
rotation=size_rotation, compression=gzip_compression,
retention=keep_latest_retention, enqueue=True, level="DEBUG",
format="{time:HH:mm:ss.SSS} | {level: <8} | {message}")
for i in range(80):
logger.bind(idx=i).debug("rotating event line number {}", i)
logger.complete(); logger.remove(ev_id)
print(f" archives created: {sorted(glob.glob('events_*.gz'))}")
banner("6a) ThreadPoolExecutor with per-thread contextualize()")
thread_caps = []
tid = logger.add(thread_caps.append, level="DEBUG", format="{message}",
filter=lambda r: "worker_id" in r["extra"])
def worker(n):
with logger.contextualize(worker_id=n):
logger.info("thread work item {}", n)
return n * n
with ThreadPoolExecutor(max_workers=8) as ex:
sq = list(ex.map(worker, range(8)))
logger.complete(); logger.remove(tid)
worker_ids = {m.record["extra"]["worker_id"] for m in thread_caps}
banner("6b) async coroutine sink + await logger.complete()")
async def run_async_demo():
sunk = []
async def async_sink(message):
await asyncio.sleep(0); sunk.append(message.record["message"])
sid = logger.add(async_sink, level="DEBUG", catch=True)
async def task(n):
with logger.contextualize(coro=n):
logger.info("async task {} start", n)
await asyncio.sleep(0.01)
logger.success("async task {} done", n)
await asyncio.gather(*(task(i) for i in range(5)))
await logger.complete()
logger.remove(sid)
return sunk
try:
async_msgs = asyncio.run(run_async_demo())
except RuntimeError:
async_msgs = asyncio.get_event_loop().run_until_complete(run_async_demo())
print(f" async sink received {len(async_msgs)} messages")
We demonstrate custom file management by automatically rotating, compressing, and retaining log files. We then test thread-safe logging by running multiple workers, each with its own contextual metadata. We also add an asynchronous coroutine sink to see how Loguru handles async tasks and correctly drains pending logs.
banner("7) intercept stdlib `logging` and filter a chatty library")
logging.basicConfig(handlers=[InterceptHandler()], level=0, force=True)
lib_caps = []
def lib_filter(record):
if record["extra"].get("stdlib_logger") == "chatty":
return record["level"].no >= logger.level("WARNING").no
return True
lid = logger.add(lib_caps.append, level="DEBUG", format="{message}",
filter=lambda r: ("stdlib_logger" in r["extra"]) and lib_filter(r))
logging.getLogger("chatty").info("noisy info (should be filtered out)")
logging.getLogger("chatty").warning("noisy warning (kept)")
logging.getLogger("important").debug("important debug (kept)")
logger.complete(); logger.remove(lid)
banner("8) SELF-TESTS")
logger.complete(); time.sleep(0.2)
try:
rec = json.loads(open("structured.jsonl").read().splitlines()[-1])
check("JSON sink serializes records", {"text", "record"} <= set(rec))
except Exception as e:
check("JSON sink serializes records", False, str(e))
try:
err_txt = open("errors.log").read()
check("errors.log captured ZeroDivisionError", "ZeroDivisionError" in err_txt)
except Exception as e:
check("errors.log captured ZeroDivisionError", False, str(e))
check("custom rotation produced .gz archives", len(glob.glob("events_*.gz")) >= 1,
f"{len(glob.glob('events_*.gz'))} archive(s)")
check("custom NOTICE level recorded", mem.has_level("NOTICE"))
check("SUCCESS level recorded", mem.has_level("SUCCESS"))
check("bound user_id=42 present", bool(mem.find(lambda r: r["extra"].get("user_id") == 42)))
check("contextualize task present", bool(mem.find(lambda r: r["extra"].get("task") == "batch-job")))
check("global patcher stamped env", bool(mem.find(lambda r: r["extra"].get("env") == "colab")))
check("exception captured in a record",bool(mem.find(lambda r: r["exception"] is not None)))
check("threads logged all 8 workers", worker_ids == set(range(8)), str(sorted(worker_ids)))
check("async sink got 10 messages", len(async_msgs) == 10, f"{len(async_msgs)} msgs")
kept = {m.record["extra"]["stdlib_logger"] + ":" + m.record["level"].name for m in lib_caps}
check("library INFO filtered, rest kept",
("chatty:INFO" not in kept) and ("chatty:WARNING" in kept) and ("important:DEBUG" in kept),
str(sorted(kept)))
We intercept Python’s built-in logging module and route standard library logs into Loguru. We apply source-aware filtering so that noisy logs from one library can be suppressed while important messages are still kept. We then run self-tests to verify JSON logging, error capture, archive creation, context propagation, exception records, threaded logs, async logs, and filtering behavior.
banner("9) throughput: enqueue=False vs enqueue=True")
def bench(enqueue, n=15000):
logger.remove()
sid = logger.add(lambda m: None, level="DEBUG", format="{message}", enqueue=enqueue)
t0 = time.perf_counter()
for i in range(n): logger.bind(i=i).debug("benchmark {}", i)
logger.complete(); dt = time.perf_counter() - t0
logger.remove(sid)
return n / dt
try:
sync_tput = bench(False); async_tput = bench(True)
print(f" direct : {sync_tput:,.0f} msg/s")
print(f" enqueue : {async_tput:,.0f} msg/s (non-blocking, process/thread-safe)")
check("benchmark completed", sync_tput > 0 and async_tput > 0)
except Exception as e:
check("benchmark completed", False, str(e))
banner("10) multiprocessing with enqueue=True (fork)")
try:
logger.remove()
mp_id = logger.add("mp.log", enqueue=True, level="DEBUG",
format="{extra[child]} | {message}")
ctx = multiprocessing.get_context("fork")
with ProcessPoolExecutor(max_workers=4, mp_context=ctx) as ex:
pids = list(ex.map(mp_worker, range(4)))
logger.complete(); logger.remove(mp_id); time.sleep(0.1)
lines = open("mp.log").read().splitlines()
check("multiprocessing logged from children", len(lines) >= 4,
f"{len(lines)} lines from {len(set(pids))} PIDs")
except Exception as e:
check("multiprocessing logged from children", False, f"unsupported here: {e}")
logger.remove()
logger.add(sys.stderr, level="INFO", colorize=True,
format="<green>{time:HH:mm:ss}</green> | <level>{level}</level> | {message}")
banner("RESULTS")
passed = sum(ok for _, ok in RESULTS)
for name, ok in RESULTS:
print(f" {'✅' if ok else '❌'} {name}")
print(f"\n {passed}/{len(RESULTS)} checks passed")
print(f" files: {sorted(glob.glob('*'))}")
(logger.success if passed == len(RESULTS) else logger.warning)(
"✅ Loguru tutorial complete!" if passed == len(RESULTS)
else f"⚠ Completed with {len(RESULTS)-passed} failed check(s)"
)
We benchmark Loguru throughput by comparing direct logging with enqueue-based logging. We then test multiprocessing-safe logging by writing messages from child processes into a shared log file. Also, we clean up the logger, print all test results, list the generated files, and show whether the tutorial completes successfully.
In conclusion, we built a complete and robust logging system using Loguru that goes far beyond basic print-style debugging. We learned how to configure multiple sinks, capture structured JSON records, add contextual metadata, preserve useful exception information, manage rotating log files, filter noisy third-party libraries, and handle concurrent workloads across threads, async tasks, and processes. We also included self-verification checks and a small benchmark to confirm that the logging pipeline works correctly and to assess its performance behavior.