A-A+

suricata 3.1 源码分析32 (FlowWorker处理流程1)

2018年12月06日 suricata 暂无评论

TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data, PacketQueue *preq, PacketQueue *unused)
{
    FlowWorkerThreadData *fw = data;
/*FlowWorkerThreadInit中初始化,包含大量stats统计指标,
decode指标在DecodeThreadVars中,
tcp指标在StreamTcpThread中*/

    void *detect_thread = SC_ATOMIC_GET(fw->detect_thread);
//获取detect线程(3.1中的worker线程)的数据

    SCLogDebug("packet %"PRIu64, p->pcap_cnt);

    /* update time */
    if (!(PKT_IS_PSEUDOPKT(p))) {
        TimeSetByThread(tv->id, &p->ts);
    }

    /* handle Flow */
    if (p->flags & PKT_WANTS_FLOW) {
/*如果p->flags被置了PKT_WANTS_FLOW(tcp的包一定会被置PKT_WANTS_FLOW)
说明这是一条流中的一个包,需要更新其对应的流信息*/
        FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_FLOW);

        FlowHandlePacket(tv, fw->dtv, p);
        if (likely(p->flow != NULL)) {
            DEBUG_ASSERT_FLOW_LOCKED(p->flow);
            FlowUpdate(p);
        }
        /* Flow is now LOCKED */

        FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_FLOW);

    /* if PKT_WANTS_FLOW is not set, but PKT_HAS_FLOW is, then this is a
     * pseudo packet created by the flow manager. */
    } else if (p->flags & PKT_HAS_FLOW) {
        FLOWLOCK_WRLOCK(p->flow);
    }
/*此时流是被锁住的*/

    SCLogDebug("packet %"PRIu64" has flow? %s", p->pcap_cnt, p->flow ? "yes" : "no");

    /* handle TCP and app layer */
    if (PKT_IS_TCP(p)) {
        SCLogDebug("packet %"PRIu64" is TCP", p->pcap_cnt);
        DEBUG_ASSERT_FLOW_LOCKED(p->flow);

        FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_STREAM);
        StreamTcp(tv, p, fw->stream_thread, &fw->pq, NULL);
//主要的流操作都在这里,后面会详细分析
        FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_STREAM);

        /* Packets here can safely access p->flow as it's locked */
        SCLogDebug("packet %"PRIu64": extra packets %u", p->pcap_cnt, fw->pq.len);
        Packet *x;
        while ((x = PacketDequeue(&fw->pq))) {
            SCLogDebug("packet %"PRIu64" extra packet %p", p->pcap_cnt, x);

            // TODO do we need to call StreamTcp on these pseudo packets or not?
            //StreamTcp(tv, x, fw->stream_thread, &fw->pq, NULL);
            if (detect_thread != NULL) {
                FLOWWORKER_PROFILING_START(x, PROFILE_FLOWWORKER_DETECT);
                Detect(tv, x, detect_thread, NULL, NULL);
//detect操作太过复杂,至今没有理出头绪,先放在这里,后续再补全
                FLOWWORKER_PROFILING_END(x, PROFILE_FLOWWORKER_DETECT);
            }
#if 0
            //  Outputs
#endif
            /* put these packets in the preq queue so that they are
             * by the other thread modules before packet 'p'. */
            PacketEnqueue(preq, x);
        }

    /* handle the app layer part of the UDP packet payload */
    } else if (p->flow && p->proto == IPPROTO_UDP) {
        FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_APPLAYERUDP);
        AppLayerHandleUdp(tv, fw->stream_thread->ra_ctx->app_tctx, p, p->flow);
        FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_APPLAYERUDP);
    }

    /* handle Detect */
    DEBUG_ASSERT_FLOW_LOCKED(p->flow);
    SCLogDebug("packet %"PRIu64" calling Detect", p->pcap_cnt);

    if (detect_thread != NULL) {
        FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_DETECT);
        Detect(tv, p, detect_thread, NULL, NULL);
//同上,暂不分析,留待以后补全
        FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_DETECT);
    }
#if 0
    // Outputs

    // StreamTcpPruneSession (from TmqhOutputPacketpool)
#endif

    if (p->flow) {
        DEBUG_ASSERT_FLOW_LOCKED(p->flow);
        FLOWLOCK_UNLOCK(p->flow);
//解锁对应的流
    }

    return TM_ECODE_OK;
}

 

标签:

给我留言

Copyright © 九毛的官方博客 保留所有权利.   Theme  Ality

用户登录