Redact sensitive info from debug logs (#1069)

Redacts sensitive data when debug logging device responses such as mac,
location and usernames
This commit is contained in:
Steven B.
2024-07-17 18:57:09 +01:00
committed by GitHub
parent c19389f236
commit c4f015a2fb
11 changed files with 300 additions and 39 deletions

View File

@@ -18,6 +18,7 @@ import hashlib
import logging
import struct
from abc import ABC, abstractmethod
from typing import Any, Callable, TypeVar, cast
# When support for cpython older than 3.11 is dropped
# async_timeout can be replaced with asyncio.timeout
@@ -28,6 +29,46 @@ _LOGGER = logging.getLogger(__name__)
_NO_RETRY_ERRORS = {errno.EHOSTDOWN, errno.EHOSTUNREACH, errno.ECONNREFUSED}
_UNSIGNED_INT_NETWORK_ORDER = struct.Struct(">I")
_T = TypeVar("_T")
def redact_data(data: _T, redactors: dict[str, Callable[[Any], Any] | None]) -> _T:
"""Redact sensitive data for logging."""
if not isinstance(data, (dict, list)):
return data
if isinstance(data, list):
return cast(_T, [redact_data(val, redactors) for val in data])
redacted = {**data}
for key, value in redacted.items():
if value is None:
continue
if isinstance(value, str) and not value:
continue
if key in redactors:
if redactor := redactors[key]:
try:
redacted[key] = redactor(value)
except: # noqa: E722
redacted[key] = "**REDACTEX**"
else:
redacted[key] = "**REDACTED**"
elif isinstance(value, dict):
redacted[key] = redact_data(value, redactors)
elif isinstance(value, list):
redacted[key] = [redact_data(item, redactors) for item in value]
return cast(_T, redacted)
def mask_mac(mac: str) -> str:
"""Return mac address with last two octects blanked."""
delim = ":" if ":" in mac else "-"
rest = delim.join(format(s, "02x") for s in bytes.fromhex("000000"))
return f"{mac[:8]}{delim}{rest}"
def md5(payload: bytes) -> bytes:
"""Return the MD5 hash of the payload."""