Skip to content

euroeval.speed_benchmark

[docs] module euroeval.speed_benchmark

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
"""Benchmarking model inference speed."""

import collections.abc as c
import logging
import typing as t

import pyinfer
from transformers.models.auto.tokenization_auto import AutoTokenizer

from .benchmark_modules import HuggingFaceEncoderModel, LiteLLMModel, VLLMModel
from .exceptions import InvalidBenchmark
from .logging_utils import get_pbar, log
from .utils import clear_memory

if t.TYPE_CHECKING:
    from .benchmark_modules import BenchmarkModule
    from .data_models import BenchmarkConfig


def benchmark_speed(
    model: "BenchmarkModule", benchmark_config: "BenchmarkConfig"
) -> c.Sequence[dict[str, float]]:
    """Benchmark model inference speed.

    Args:
        model:
            Model to use.
        benchmark_config:
            Configuration for the benchmark.

    Returns:
        Dictionary of scores.
    """
    scores: list[dict[str, float]] = list()
    for idx in get_pbar(
        iterable=range(benchmark_config.num_iterations),
        desc="Benchmarking",
        disable=not benchmark_config.progress_bar,
    ):
        itr_scores = benchmark_speed_single_iteration(model=model, itr_idx=idx)
        clear_memory()
        scores.append(itr_scores)
        log(f"Scores for iteration {idx}: {itr_scores}", level=logging.DEBUG)
    return scores


def benchmark_speed_single_iteration(
    model: "BenchmarkModule", itr_idx: int
) -> dict[str, float]:
    """Run a single iteration of the speed benchmark.

    Args:
        model:
            The model to use in the benchmark.
        itr_idx:
            The index of the iteration.

    Returns:
        A dictionary containing the scores for the current iteration.
    """
    gpt2_tokeniser = AutoTokenizer.from_pretrained("gpt2", trust_remote_code=True)

    base_doc = "Document which contains roughly 10 tokens. "
    multiplier = 10 * (1 + itr_idx)
    doc = base_doc * multiplier
    short_multiplier = 1.25 * (1 + itr_idx)
    short_doc = base_doc * round(short_multiplier)

    def generate_messages_predict(doc: str) -> None:
        model.generate(inputs=dict(messages=[[dict(role="user", content=doc)]]))

    def generate_prompt_predict(doc: str) -> None:
        model.generate(inputs=dict(text=[doc]))

    def encoder_predict(doc: str) -> None:
        tokeniser = model.get_tokeniser()
        pytorch_model = model.get_pytorch_module()
        inputs = {
            key: tensor.to(pytorch_model.device)
            for key, tensor in tokeniser(
                text=[doc], truncation=True, return_tensors="pt"
            ).items()
        }
        pytorch_model(**inputs)

    if isinstance(model, VLLMModel):
        predict = generate_prompt_predict
    elif isinstance(model, LiteLLMModel):
        predict = generate_messages_predict
    elif isinstance(model, HuggingFaceEncoderModel):
        predict = encoder_predict
    else:
        raise ValueError(f"Model type {model} not supported for speed benchmark")

    try:
        # Do a warmup run, as the first run is always slower
        pyinfer.InferenceReport(model=predict, inputs=base_doc, n_seconds=1).run(
            print_report=False
        )

        speed_scores = pyinfer.InferenceReport(
            model=predict, inputs=doc, n_seconds=3
        ).run(print_report=False)
        num_gpt2_tokens = len(gpt2_tokeniser([doc], truncation=True)["input_ids"][0])
        gpt2_tokens_per_second = speed_scores["Infer(p/sec)"] * num_gpt2_tokens

        speed_scores_short = pyinfer.InferenceReport(
            model=predict, inputs=short_doc, n_seconds=3
        ).run(print_report=False)
        num_gpt2_tokens_short = len(
            gpt2_tokeniser([short_doc], truncation=True)["input_ids"][0]
        )
        gpt2_tokens_per_second_short = (
            speed_scores_short["Infer(p/sec)"] * num_gpt2_tokens_short
        )

    except (RuntimeError, ValueError, IndexError) as e:
        raise InvalidBenchmark(f"Speed benchmark failed with error: {e!r}") from e

    return dict(
        test_speed=gpt2_tokens_per_second, test_speed_short=gpt2_tokens_per_second_short
    )