跳转至

DBTracingProcessor

Bases: TracingProcessor

Basic tracing processor that stores events into database.

Required environment variables: UTU_DB_URL

源代码位于: utu/tracing/db_tracer.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
class DBTracingProcessor(TracingProcessor):
    """Basic tracing processor that stores events into database.

    Required environment variables: `UTU_DB_URL`
    """

    def __init__(self) -> None:
        if not SQLModelUtils.check_db_available():
            logger.warning("UTU_DB_URL not set or connection failed! Tracing will not be stored into database!")
            self.enabled = False
        else:
            self.enabled = True

    def on_trace_start(self, trace: Trace) -> None:
        pass

    def on_trace_end(self, trace: Trace) -> None:
        pass

    def on_span_start(self, span: Span[Any]) -> None:
        pass

    def on_span_end(self, span: Span[Any]) -> None:
        if not self.enabled:
            return

        data = span.span_data
        if isinstance(data, GenerationSpanData):
            with SQLModelUtils.create_session() as session:
                session.add(
                    GenerationTracingModel(
                        trace_id=get_current_trace().trace_id,
                        span_id=span.span_id,
                        input=data.input,
                        output=data.output,
                        model=data.model,
                        model_configs=data.model_config,
                        usage=data.usage,
                    )
                )
                session.commit()
        elif isinstance(data, ResponseSpanData):
            # print(f"> response_id={data.response.id}: {data.response.model_dump()}")
            with SQLModelUtils.create_session() as session:
                session.add(
                    GenerationTracingModel(
                        trace_id=get_current_trace().trace_id,
                        span_id=span.span_id,
                        input=data.input,
                        output=OpenAIUtils.get_response_output(data.response),
                        model=OpenAIUtils.maybe_basemodel_to_dict(data.response.model),
                        model_configs=OpenAIUtils.get_response_configs(data.response),
                        usage=OpenAIUtils.maybe_basemodel_to_dict(data.response.usage),
                        type="responses",
                        response_id=data.response.id,
                    )
                )
                session.commit()
        elif isinstance(data, FunctionSpanData):
            with SQLModelUtils.create_session() as session:
                session.add(
                    ToolTracingModel(
                        name=data.name,
                        input=data.input,
                        output=data.output,
                        mcp_data=data.mcp_data,
                        trace_id=get_current_trace().trace_id,
                        span_id=span.span_id,
                    )
                )
                session.commit()

    def force_flush(self) -> None:
        pass

    def shutdown(self) -> None:
        pass