cover

At one point in the software life cycle, a project becomes legacy. Then the goal is to keep its maintenance cost at a bare minimum and “only” consume security updates. But can one actually do that without also incorporating any breaking changes? My gut feeling disagrees with that. But what does the data say?

Data

Security issues are retrieved from the RustSec Advisory Database and crate version information is extracted from the regular Crates.io Database Dumps. This is obviously a biased analysis.

Methodology

So you only want to do the bare minimum and hopefully do not need to address breaking changes in your code base? How long would it take for a security issue to pop up that forces you to do something? Or in different words: over time, how many security updates can you no longer consume?

When a security advisory for certain versions is published (= “advisory date”), we check which versions are fixed or unaffected. For the fixed/unaffected versions, we take the first compatible version – based on SemVer – and record that date (= “earliest upgrade date”). The difference between these two dates (“advisory date” and “earliest upgrade date”) is maximum potential “time to chill”.

Simplifying Assumptions

In this analysis, a few assumptions were made:

  • independent crates: Crates upgrades are independent. This is not always true in reality – the HTTP being a notable example.
  • breaking change: We assume that a SemVer major version bump also results in a breaking change that requires code changes by the consumer of that crate. That is also not always true. Semantic versioning bumps may have also occur due to behavior changes, or they only affect parts of the library API that you do not care about.

Processing

Data is processed like this:

  • patched / unaffected: We treat both “patched” and “unaffected” crates as potential fixes. That’s conservative, because sometimes you cannot “go back” to an unaffected version that doesn’t have other bug fixes / features yet.
  • informal advisories: We filter out informal advisories marked as “unmaintained” and “unsound”. While both are ticking time bombs, they are not concrete issues yet.
  • pre-releases: We assume you won’t directly deploy pre-releases in production, so they do not count for the “time to chill” calculation.

Weighting

To present a nice, high-level result a quantile plot was chosen. For that we somehow need to weight / count security advisories. Simply treating all security advisories as equal and calculating the quantiles based on that does not seem fair, since some libraries and security issues clearly have bigger impact than others. Small libraries with little use can publish advisories all they want, but it will likely have little effect on the ecosystem. Now ideally you would filter the crates depending on your code base and even weight security advisories by some metric that is important to you – e.g. CVSS. Without specific knowledge about your code base however, we need to make generic assumptions. So the for each advisory, we add up the crate download counts (both pre-path and post-patch) and weight the advisory based on that.

Results

phasetime to chillweighted quantile
fix delivered as breaking change-2y100.0%
-1y99.8%
0y97.3%
there was time to catch up1y72.8%
2y62.9%
3y50.8%
4y36.8%
5y29.6%
6y17.1%
7y8.2%
8y5.9%
9y5.9%
10y4.7%
11y2.1%

Given the assumptions in this analysis this means: if you do not follow any breaking changes for 1 year, you will only be able to consume about 72.8% of security updates without major effort.

The case of RUSTSEC-2022-0093

Sometimes crates are fixed long after an issue is reported. There is one particular case of RUSTSEC-2022-0093 which affected the rather popular crate ed25519-dalek. Looking at rustsec/advisory-db#1744 one might wonder why the PR was filed 2023-08-14, but the advisory data itself names 2022-06-11. Turns out there is a longer backstory to this, which you can read here: github.com/MystenLabs/ed25519-unsafe-libs.

What to do

The data seems to indicate that your legacy software project is slowly going to degrade if you do not upgrade your dependencies regularly. The cost of keeping your software free of security bugs can however be reduced:

  • delayed upgrades: You don’t have to perform upgrades immediately and either batch your upgrades or delay them until a security upgrade demands them. This however increases the risk in case of a security upgrade that requires invasive changes due to the missed catch-up.
  • zero-effort upgrades: Some major version upgrades may be rather easy or simply require no code changes at all on your side.
  • (semi-)automated upgrades: Tools like AI agents can help you to fix the fall-out of breaking changes in your code base.
  • pay upstream: You can pay upstream – either with money or by employing maintainers – to backport critical bug fixes.

Technical Details

The entire processing was done in Python in a marimo notebook.

Code for notebook.py
# /// script
# requires-python = ">=3.12"
# dependencies = [
#     "altair==6.0.0",
#     "cvss==3.6",
#     "marimo",
#     "polars==1.36.1",
#     "requests==2.32.5",
#     "semver==3.0.4",
# ]
# ///

import marimo

__generated_with = "0.19.7"
app = marimo.App(width="medium")


@app.cell(hide_code=True)
def _(mo):
    mo.md(r"""
    # Rustsec Analysis
    """)
    return


@app.cell(hide_code=True)
def _(mo):
    mo.md(r"""
    ## Setup
    """)
    return


@app.cell
def _():
    import dataclasses
    import datetime
    import fnmatch
    import io
    import tomllib
    import zipfile

    from typing import Self

    import altair as alt
    import marimo as mo
    import polars as pl

    import cvss
    import requests
    import semver
    return (
        Self,
        alt,
        cvss,
        dataclasses,
        datetime,
        fnmatch,
        io,
        mo,
        pl,
        requests,
        semver,
        tomllib,
        zipfile,
    )


@app.cell(hide_code=True)
def _(mo):
    mo.md(r"""
    ## Rustsec Data Import
    """)
    return


@app.function
def toml_block_from_markdown(md: str) -> str:
    md_lines = iter(md.splitlines(keepends=False))

    for line in md_lines:
        if line == "```toml":
            break

    toml_lines = []
    for line in md_lines:
        if line == "```":
            break
        toml_lines.append(line)

    return "\n".join(toml_lines)


@app.cell
def _(fnmatch, io, requests, tomllib, zipfile):
    resp = requests.get(
        "https://github.com/rustsec/advisory-db/archive/refs/heads/main.zip"
    )
    resp.raise_for_status()

    advisories = []

    with zipfile.ZipFile(io.BytesIO(resp.content)) as fz:
        for entry in fnmatch.filter(
            fz.namelist(), "advisory-db-main/crates/**/*.md"
        ):
            with fz.open(entry) as f:
                data = tomllib.loads(
                    toml_block_from_markdown(f.read().decode("utf8"))
                )
                advisories.append(data)
    return advisories, data


@app.cell
def _(data):
    data
    return


@app.cell
def _(cvss):
    def cvss_score(vector: str) -> float:
        if vector.startswith("CVSS:3"):
            return max(cvss.CVSS3(vector).scores())
        else:
            return max(cvss.CVSS4(vector).scores())
    return (cvss_score,)


@app.cell
def _(advisories, cvss_score, datetime, pl):
    df_advisories = (
        pl.DataFrame(
            {
                "advisory_id": [a["advisory"]["id"] for a in advisories],
                "crate": [a["advisory"]["package"] for a in advisories],
                "advisory_date": [
                    datetime.date.fromisoformat(a["advisory"]["date"])
                    for a in advisories
                ],
                "unaffected": [
                    a["versions"].get("unaffected") for a in advisories
                ],
                "patched": [a["versions"].get("patched") for a in advisories],
                "cvss": [a["advisory"].get("cvss") for a in advisories],
                "categories": [
                    a["advisory"].get("categories") for a in advisories
                ],
                "type": [a["advisory"].get("informational") for a in advisories],
            }
        )
        .with_columns(pl.col("cvss").map_elements(cvss_score, return_dtype=float))
        .with_columns(pl.col("cvss").alias("advisory_score"))
    )
    df_advisories
    return (df_advisories,)


@app.cell(hide_code=True)
def _(mo):
    mo.md(r"""
    ## Version Handling

    We need to handle `cargo` versions proparly.
    """)
    return


@app.cell
def _(Self, dataclasses, semver):
    @dataclasses.dataclass
    class CargoVersion:
        base: str
        v: str
        next: str

        @classmethod
        def from_str(cls, v: str) -> Self:
            parsed = semver.Version.parse(v)

            match (parsed.major, parsed.minor):
                case (0, 0):
                    base = "0.0.0"
                    next = str(parsed.bump_patch())
                case (0, _):
                    base = f"0.{parsed.minor}.0"
                    next = str(parsed.bump_minor())
                case (_, _):
                    base = f"{parsed.major}.0.0"
                    next = str(parsed.bump_major())

            return cls(base=base, v=v, next=next)


    def _expand_base_version(v: str) -> str:
        v = v.strip()

        match v.count("."):
            case 0:
                return f"{v}.0.0"
            case 1:
                return f"{v}.0"
            case 2:
                return v
            case 3:
                return v
            case _:
                raise Exception(f"invalid version: `{v}`")


    def _expand_plain_version(v: str) -> list[str]:
        v = _expand_base_version(v)
        return [f">={v}", f"<{CargoVersion.from_str(v).next}"]


    def expand_version_expr(expr: str) -> list[str]:
        expr = expr.strip()

        if expr.startswith("^") or expr.startswith("="):
            expr = expr[1:]
            assert expr.count(".") == 2
            return [expr]
        elif expr.startswith("~"):
            expr = expr[1:]
            return _expand_plain_version(expr)
        elif "0" <= expr[0] <= "9":
            return _expand_plain_version(expr)
        elif expr.startswith(">="):
            expr = expr[2:]
            return [f">={_expand_base_version(expr)}"]
        elif expr.startswith("<"):
            expr = expr[1:]
            return [f"<{_expand_base_version(expr)}"]
        elif expr.startswith(">"):
            expr = expr[1:]
            return [f">{_expand_base_version(expr)}"]
        else:
            raise Exception(f"invalid expression: `{expr}`")
    return CargoVersion, expand_version_expr


@app.cell
def _(expand_version_expr, semver):
    def semver_match(version: str, constraints: list[str]) -> bool:
        return any(
            (
                all(
                    (
                        semver.match(version, expr.replace(" ", ""))
                        for part in constraint.split(",")
                        for expr in expand_version_expr(part.strip())
                    )
                )
                for constraint in constraints
            )
        )
    return (semver_match,)


@app.cell(hide_code=True)
def _(mo):
    mo.md(r"""
    ## crates.io Data Import

    See <https://crates.io/data-access#database-dumps>.
    """)
    return


@app.cell
def _():
    crates_io_date_format = "%Y-%m-%d %H:%M:%S%.f%#z"
    return (crates_io_date_format,)


@app.cell
def _(mo, pl):
    df_crates = pl.read_csv(
        mo.notebook_dir() / "crates_io_db_dump" / "data" / "crates.csv"
    ).rename({"id": "crate_id", "name": "crate"})
    df_crates
    return (df_crates,)


@app.cell
def _(CargoVersion, crates_io_date_format, mo, pl, semver):
    df_versions = (
        pl.read_csv(
            mo.notebook_dir() / "crates_io_db_dump" / "data" / "versions.csv"
        )
        .rename(
            {
                "num": "version",
                "created_at": "version_date",
                "updated_at": "version_update",
            }
        )
        .with_columns(
            pl.col("version_date").str.to_datetime(crates_io_date_format),
            pl.col("version_update").str.to_datetime(crates_io_date_format),
            (
                pl.col("version")
                .map_elements(
                    lambda v: semver.Version.parse(v).prerelease is not None,
                    return_dtype=bool,
                )
                .alias("is_prerelease")
            ),
            (
                pl.col("version")
                .map_elements(
                    lambda v: CargoVersion.from_str(v).base, return_dtype=str
                )
                .alias("base")
            ),
        )
    )
    df_versions
    return (df_versions,)


@app.cell(hide_code=True)
def _(mo):
    mo.md(r"""
    ## Merge Data
    """)
    return


@app.cell
def _(df_advisories, df_crates, df_versions, pl, semver_match):
    df_merged = (
        df_advisories.join(
            other=df_crates.select("crate", "crate_id"),
            on="crate",
        )
        .join(
            other=df_versions.select(
                "crate_id",
                "version",
                "version_date",
                "base",
                "downloads",
                "is_prerelease",
            ),
            on="crate_id",
        )
        .with_columns(
            pl.struct("version", "patched")
            .map_elements(
                lambda x: (
                    semver_match(x["version"], x["patched"])
                    if x["patched"]
                    else False
                ),
                return_dtype=bool,
            )
            .alias("is_patched"),
            pl.struct("version", "unaffected")
            .map_elements(
                lambda x: (
                    semver_match(x["version"], x["unaffected"])
                    if x["unaffected"]
                    else False
                ),
                return_dtype=bool,
            )
            .alias("is_unaffected"),
        )
    )
    df_merged
    return (df_merged,)


@app.cell
def _(df_merged, pl):
    df_impact = df_merged.group_by("advisory_id").agg(
        pl.col("downloads").sum().alias("impact")
    )
    df_impact
    return (df_impact,)


@app.cell
def _(df_advisories, df_crates, df_impact, df_merged, df_versions, pl):
    df_leadtime = (
        # get "good" versions for each advisory
        df_merged.filter(
            (pl.col("is_patched") | pl.col("is_unaffected"))
            & (~pl.col("type").is_in(["unmaintained", "unsound"])).fill_null(True)
        )
        # select unique (advisory, base) tupled, "base" is the compatiblity range
        .select(
            "advisory_id",
            "base",
        )
        .unique()
        # join versions for each base
        .join(
            df_advisories.select("advisory_id", "crate"),
            on="advisory_id",
        )
        .join(
            df_crates.select("crate_id", "crate"),
            on=["crate"],
        )
        .join(
            df_versions.filter(~pl.col("is_prerelease")), on=["base", "crate_id"]
        )
        # find earliest version that is compatible (using "base") for each "good" version
        .group_by("advisory_id")
        .agg(
            # TODO: This base is not semantically correct (i.e. cannot use string-based min).
            #       This is only used for debugging though and does not influence the plot.
            pl.col("base").min(),
            pl.col("version_date").min(),
        )
        # bring back some columns
        .join(df_advisories, on="advisory_id")
        .join(df_impact, on="advisory_id")
        .join(
            df_crates.select("crate_id", "crate"),
            on=["crate"],
        )
        # calculate lead time
        .with_columns(
            (pl.col("advisory_date") - pl.col("version_date"))
            .dt.total_days()
            .alias("lead_time")
        )
        .sort("advisory_id")
    )

    df_leadtime
    return (df_leadtime,)


@app.cell
def _(alt, df_leadtime, pl):
    def _calc_labels(min: int, max: int, step: int) -> list[int]:
        values = [0]

        current = step
        while True:
            values.append(current)
            current += step
            if current > max:
                break

        current = -step
        while True:
            values.append(current)
            current -= step
            if current < min:
                break

        return sorted(values)


    df_plot = (
        df_leadtime.sort("lead_time", descending=True)
        .with_columns(
            (pl.col("impact").cum_sum() / df_leadtime["impact"].sum()).alias("cum")
        )
        .sort("lead_time")
    )

    # add one extra rows with max & zero point
    df_plot = pl.concat(
        [
            pl.DataFrame(
                {"lead_time": [df_leadtime["lead_time"].min()], "cum": [1.0]}
            ),
            df_plot,
            pl.DataFrame(
                {"lead_time": [df_leadtime["lead_time"].max()], "cum": [0.0]}
            ),
        ],
        how="diagonal",
    )

    x_labels = _calc_labels(
        df_leadtime["lead_time"].min(),
        df_leadtime["lead_time"].max(),
        365,
    )
    y_labels = [
        df_plot["cum"][max(0, df_plot["lead_time"].search_sorted(l) - 1)]
        for l in x_labels
    ]
    label_map = dict(zip(x_labels, y_labels))

    c_plot = (
        alt.Chart(df_plot)
        .mark_line(color="black", interpolate="step-after")
        .transform_joinaggregate(total="sum(impact)")
        .encode(
            x=alt.X(
                "lead_time:Q",
            ),
            y=alt.Y(
                "cum:Q",
            ),
        )
    )

    arrow_offset = 100
    text_y = 1.075
    pointer_offset = 20
    c_annotation = alt.layer(
        (
            alt.Chart()
            .mark_text(align="center")
            .encode(
                x=alt.datum(arrow_offset),
                y=alt.datum(text_y),
                text=alt.datum(["⟹"]),
            )
        ),
        (
            alt.Chart()
            .mark_text(align="left")
            .encode(
                x=alt.datum(2 * arrow_offset),
                y=alt.datum(text_y),
                text=alt.datum(
                    [
                        "there was time before",
                        "the advisory to catch",
                        "up to latest breaking",
                        "change",
                    ]
                ),
            )
        ),
        (
            alt.Chart()
            .mark_text(align="center")
            .encode(
                x=alt.datum(-arrow_offset),
                y=alt.datum(text_y),
                text=alt.datum(["⟸"]),
            )
        ),
        (
            alt.Chart()
            .mark_text(align="right")
            .encode(
                x=alt.datum(-2 * arrow_offset),
                y=alt.datum(text_y),
                text=alt.datum(["fix delivered as", "breaking change"]),
            )
        ),
        (
            alt.Chart()
            .mark_text(
                align="center",
                dx=-0.75 * pointer_offset,
                fontSize=20,
                angle=135,
                baseline="middle",
            )
            .encode(
                x=alt.datum(365),
                y=alt.datum(label_map[365]),
                text=alt.datum("➡"),
            )
        ),
        (
            alt.Chart()
            .mark_text(
                align="left",
                dx=pointer_offset,
                dy=-pointer_offset,
                baseline="bottom",
            )
            .encode(
                x=alt.datum(365),
                y=alt.datum(label_map[365]),
                text=alt.datum(
                    [
                        "after 1 year of NOT",
                        "catching up with breaking changes,",
                        f"you only get ≈{int(round(label_map[365] * 100))}% security patches",
                    ]
                ),
            )
        ),
        (
            alt.Chart()
            .mark_rule(stroke="black")
            .encode(x=alt.datum(0), y=alt.datum(0), y2=alt.datum(text_y + 0.01))
        ),
    )

    c_grid_base = alt.Chart(
        pl.DataFrame({"x": x_labels, "y": y_labels})
    ).mark_rule(stroke="lightgray")
    c_grid = alt.layer(
        (
            c_grid_base.encode(
                x=alt.datum(
                    df_leadtime["lead_time"].min(),
                ),
                x2=alt.X("x"),
                y=alt.Y("y"),
            )
        ),
        (
            c_grid_base.encode(
                x=alt.X("x"),
                y=alt.datum(0),
                y2=alt.Y("y"),
            )
        ),
    )

    alt.layer(c_grid, c_plot, c_annotation).encode(
        x=alt.X(
            axis=alt.Axis(
                title="time to chill, duration after breaking change until security event",
                values=x_labels,
                labelExpr="datum.value / 365 + 'y'",
                grid=False,
            ),
            scale=alt.Scale(
                domain=[
                    df_leadtime["lead_time"].min(),
                    df_leadtime["lead_time"].max(),
                ],
            ),
        ),
        y=alt.Y(
            axis=alt.Axis(
                format="%",
                title="cumulative weighted amount of fixes",
                values=y_labels,
                labelOverlap=False,
                labelFontSize=8,
                grid=False,
            ),
            scale=alt.Scale(domain=[0, 1]),
        ),
    ).properties(
        height=600,
        width=600,
        title=alt.TitleParams(
            text="Breaking Release before Security Fix (Rust)",
            subtitle=[
                "https://crepererum.net/only-security-updates",
            ],
            subtitlePadding=10,
            offset=-120,
            align="right",
            anchor="end",
        ),
        config=alt.Config(view={"stroke": "transparent"}),
    )
    return x_labels, y_labels


@app.cell
def _(pl, x_labels, y_labels):
    pl.DataFrame(
        {
            "lead time": [f"{d / 365:.0f}y" for d in x_labels],
            "quantile": [f"{y * 100:.1f}%" for y in y_labels],
        }
    )
    return


if __name__ == "__main__":
    app.run()