Skip to main content

Command Palette

Search for a command to run...

The implementation of anti-backflow logic based on Linux real-time threads and UDP fast channels.(PART ONE)

Published
7 min read

这段时间接触了一个防逆流的需求开发,整理一下基本的控制策略。

一、 防逆流系统的核心逻辑流程

整个防逆流过程可以被划分为:采样监测 -> 逻辑判定 -> 信号触发 -> 快速执行 四个阶段。我们按照分层的思想来讲解:

  • 感知层:RS485 电表采样。

  • 决策层:EMS 站级控制算法。

  • 执行层:PCS 逆变器功率调整。

同时通讯双轨制

  • TCP(控制面):处理心跳、配置、注册,保障 100% 到达。

  • UDP(数据面):专为防逆流设计的“快速通道”,规避拥塞控制,追求极低延迟。

二、技术解答

1. 采样监测(判定阶段)

“紧急警报器”安装,分为485电表线程,单柜的初始化函数,站控的初始化函数,以及站控的上报函数

    pWhim_Rough->nSemCCUWith485Meter = Creat_SemID(SEM_CCU_485_PATH, SEM_CCU_485_PROJ_ID, 1, IPC_CREAT | 0666);
    printf("whMeterSampler semid = [%d]\n",pWhim_Rough->nSemCCUWith485Meter);
    LogOut(MAIN_MODULE_MAIN, "whMeterSampler", LOG_TYPE_INFO,
        "whMeterSampler semid --> [%d] !\n",pWhim_Rough->nSemCCUWith485Meter);
    if (pWhim_Rough->nSemCCUWith485Meter == -1)
    {
        LogOut(MAIN_MODULE_REPORTER, "whMeterSampler", LOG_TYPE_ERROR, 
            "Init SEMID FAILED!!!");
    }​

它在系统启动阶段申请了一个 System-V 信号量 (Semaphore)

  • 核心功能:创建一个跨进程/线程的“计数器”。

  • Creat_SemID:利用 ftok 产生的唯一 Key 获取一个信号量 ID。

  • IPC_CREAT | 0666:如果没有就创建,并赋予读写权限。

  • 目的:为后续的异步控制提供物理句柄。没有这个 ID,判定函数就找不到“警报开关”,处理线程也收不到“警报信号”。

Judge_ActvPower_Reflux 函数中,系统实时监控电力仪表的有功功率:电表采样线程通道,电表采样Modbus快表。

  • 双表比对:系统同时获取“柜内表(PCS侧)”和“网侧表(Grid侧)”的数据。

  • 判定标准

    • fGridPwr_Sum < 0:代表电网侧检测到功率反向流动(即逆流发生)。

    • fCabPwr_Sum < 0:代表储能系统正处于放电状态。

  • 结论:当两者同时为负时,说明储能放电过大,导致电能倒灌回电网,触发防逆流调节。

    2. 逻辑判定

  • 函数分析Judge_ActvPower_Reflux(单柜),站控(_ThreadEntry_SlaveUDPComm

  • 逻辑关键

    • 双表比对:网侧表 (Grid) 与柜内表 (Cabinet) 功率方向的一致性判定。

    • 功率阈值计算:何时触发 sem_post_op

    • 代码段展示:分析有功功率为负时的处理逻辑。

....................
    //init ccu and slave reporter sem
    pSlave->nSemWithCCUAndSlave = Creat_SemID(SEM_CCU_485_PATH, SEM_CCU_485_PROJ_ID, 1, IPC_CREAT | 0666);
    TRACE("slave reporter Init semid = [%d]\n",pSlave->nSemWithCCUAndSlave);
    LogOut(MAIN_MODULE_MAIN, "SLAVE_UDP_REPORTER", LOG_TYPE_INFO,
        "SLAVE_UDP_REPORTER semid Init --> [%d] !\n",pSlave->nSemWithCCUAndSlave);
    if (pSlave->nSemWithCCUAndSlave != -1)
    {
        Init_Sem(pSlave->nSemWithCCUAndSlave, 0, SETVAL, 0);
    }

    if (!ReOpen_QuicklyCommPort_UDP_CLIENT(pReporter))
    {
        TRACEE("Slave Initialize ReOpen_QuicklyCommPort_UDP_CLIENT failed");
        return FALSE;
    }
    pReporter->hThread = RunThread_Create("SlaveTCPComm",
        (RUN_THREAD_START_PROC)_ThreadEntry_SlaveTCPComm,
        (void *)(pReporter),
        (DWORD*)&pReporter->bExit);
    if(pReporter->hThread == NULL)
    {
        LogOut(MAIN_MODULE_REPORTER, "SlaveTCPComm", LOG_TYPE_ERROR, 
            "Init SlaveTCPComm FAILED!!!");    
        return FALSE;
    }

    //----- Create A Thread To reveive the UDP messages from SEMS -----//
    struct sched_param param;
    pthread_attr_t attr;
    pthread_t hThread;

    pthread_attr_init(&attr);
    param.sched_priority = 1;    //数值越大,优先级越高。
    pthread_attr_setschedpolicy(&attr,SCHED_RR);    //实时调度策略
    pthread_attr_setschedparam(&attr,&param);    //设置优先级
    pthread_attr_setinheritsched(&attr,PTHREAD_EXPLICIT_SCHED);    //要使优先级其作用必须要有这句话

    if(pthread_create(&hThread, &attr, _ThreadEntry_SlaveUDPComm,
        (void *)pReporter) != 0) //0-Success
    {
        TRACE("----- Create _ThreadEntry_SlaveUDPComm Failed -----\n");
        LogOut(MAIN_MODULE_REPORTER, "SlaveUDPComm", LOG_TYPE_ERROR, 
            "Init _ThreadEntry_SlaveUDPComm FAILED!!!");
        pthread_attr_destroy(&attr);
        return FALSE;
    }
.................
static DWORD _ThreadEntry_SlaveUDPComm(void* pArg)
{
    REPORTER* pReporter = (REPORTER*)pArg;
    ROUGH_SLAVE_ROUGH* pSlave = (ROUGH_SLAVE_ROUGH*)pReporter->pDataHeap;
    BYTE byBuff[TCP_BUFF_LEN];
    int nReadBytes = 0;
    memset(byBuff, 0, TCP_BUFF_LEN);

    while(!pReporter->bExit)
    {
        //block wait receive 6330-UCTRL
        nReadBytes = s_pUDPCommClient->Read(s_hUDPPortClient, byBuff, TCP_BUFF_LEN);
        TRACE("_ThreadEntry_SlaveUDPComm Read...nReadBytes=[%d]\n",nReadBytes);

        if(nReadBytes >= NLMSG_MIN_LEN)
        {
            //now try to find the frame and parse it
            BYTE* pFoundPos = byBuff;
            do
            {
                BYTE *pDataPos;
                USHORT usDataLen;
                NLMSG_FRAME_INFO* pFrm = Find_NLMsgFrame(pSlave, pFoundPos, nReadBytes, &pDataPos, &usDataLen);
                if(pFrm == NULL)
                {
                    TRACE("---Get Frm Is NULL---\n");
                    break;
                }
                else if(/*(pFrm->byCmd == pSlave->nlFrameInfo[NL_MSG_CFGG].byCmd)||
                        (pFrm->byCmd == pSlave->nlFrameInfo[NL_MSG_CFGS].byCmd)||
                        (pFrm->byCmd == pSlave->nlFrameInfo[NL_MSG_USYS].byCmd)||
                        (pFrm->byCmd == pSlave->nlFrameInfo[NL_MSG_RTIF].byCmd)||*/
                        (pFrm->byCmd == pSlave->nlFrameInfo[NL_MSG_UCTL].byCmd))
                {
                    TRACE("slave Do UDP Parsing byCmd=[%02X]\n",pFrm->byCmd);
                    //TRACE("NL_MSG_CFGG[%d] NL_MSG_CFGS[%d] NL_MSG_UCTL[%d] NL_MSG_RTIF[%d] NL_MSG_USYS[%d] ",pSlave->nlFrameInfo[NL_MSG_CFGG].byCmd,pSlave->nlFrameInfo[NL_MSG_CFGS].byCmd,pSlave->nlFrameInfo[NL_MSG_UCTL].byCmd,pSlave->nlFrameInfo[NL_MSG_RTIF].byCmd,pSlave->nlFrameInfo[NL_MSG_USYS].byCmd);
                    pFrm->pfnParser(pSlave, pDataPos, usDataLen, PROTOCOL_UDP);
                    if (pSlave->nSemWithCCUAndSlave != -1)
                    {
                        // static time_t tmDebugOutSentSem = 0;
                        // if (ElapseS(tmDebugOutSentSem) > 30)
                        // {
                        //     tmDebugOutSentSem = _NOW_;
                        //    TRACEI("============================slaveReporter sent sem to ccusampler time [%0.3f]============================\n",tmNow);
                        // }
                        sem_post_op(pSlave->nSemWithCCUAndSlave);//send sem to ccusampler
                    }
                }
                else
                {
                    TRACE("Info: Maybe Other Msg byCmd=[%02X]\n", pFrm->byCmd);
                }

                nReadBytes -= usDataLen + NLMSG_MIN_LEN;
                pFoundPos += usDataLen + NLMSG_MIN_LEN;
            } while (nReadBytes >= NLMSG_MIN_LEN);
        }
    }
}
static BOOL Judge_ActvPower_Reflux(IN RS485_SAMPLER_DATA* pWhim_Rough)
{
    //如果没有安装柜内表或者网侧表,就不处理。例如站级EMS-从设备没有安装网侧表,只安装了柜内表;不需要处理
    //站级EMS 快速防逆流必须接柜内表和网侧表
    //从设备  只安装了柜内表
    //一体机  安装了柜内表和网侧表
    if((pWhim_Rough->meterData[407 - KWHMETER_INDEX_START].emMeterType == METER_TYPE_NotInstalled) 
        || (pWhim_Rough->meterData[408 - KWHMETER_INDEX_START].emMeterType == METER_TYPE_NotInstalled))
    {
        return 0;
    }
    char TempBuf[512];
    float fGridPwr_Sum = 0,  fGridPwr_A = 0,  fGridPwr_B = 0,  fGridPwr_C = 0, fCabPwr_Sum = 0;
    double tmNow = GetCurrentTime();    // current time.

    fCabPwr_Sum = pWhim_Rough->meterData[407 - KWHMETER_INDEX_START].varWHMeterRough[WHMETER_ROUGH_RTACTPower].fValue;  //cab

    fGridPwr_Sum = pWhim_Rough->meterData[408 - KWHMETER_INDEX_START].varWHMeterRough[WHMETER_ROUGH_RTACTPower].fValue; //grid meter
    fGridPwr_A = pWhim_Rough->meterData[408 - KWHMETER_INDEX_START].varWHMeterRough[WHMETER_ROUGH_ACTPower_PhA].fValue;
    fGridPwr_B = pWhim_Rough->meterData[408 - KWHMETER_INDEX_START].varWHMeterRough[WHMETER_ROUGH_ACTPower_PhB].fValue;
    fGridPwr_C = pWhim_Rough->meterData[408 - KWHMETER_INDEX_START].varWHMeterRough[WHMETER_ROUGH_ACTPower_PhC].fValue;

    int n;
    for(n = 0 ; n < MAX_WH_NUM; n++)
    {    
        if(pWhim_Rough->meterData[n].bIsCommFail == FALSE)
        {
            continue;

        }
        if(ElapseMS(LastSentPostTime) > 10000)
        {
            LastSentPostTime = tmNow;
            TRACEW("Meter Alarm [%d] time [ %0.3f ]", n, tmNow);
            // LogOut(MAIN_MODULE_MAIN, "QuickChannel", LOG_TYPE_WARING,
            //     "Meter Alarm [%d]",n);
            sem_post_op(pWhim_Rough->nSemCCUWith485Meter);    //send sem to ccusampler
            return TRUE;
        }
    }

    if ((fCabPwr_Sum < 0) && (fGridPwr_Sum < 0))
    {
        if (pWhim_Rough->nSemCCUWith485Meter != -1)
        {
            TRACEW("Start to Adjust ---Grid[ %0.3f ] Cab[ %0.3f ] time [ %0.3f ]",fGridPwr_Sum,fCabPwr_Sum,tmNow);
            // LogOut(MAIN_MODULE_MAIN, "QuickChannel", LOG_TYPE_WARING,
            //     "Start to Adjust ---Grid[ %0.3f ] Cab[ %0.3f ]",fGridPwr_Sum,fCabPwr_Sum);
            LastSentPostTime = GetCurrentTime();
            sem_post_op(pWhim_Rough->nSemCCUWith485Meter);    //send sem to ccusampler
        }
    }
    return TRUE;
}

3. 信号触发(同步阶段)

为了追求极致的响应速度,系统放弃了传统的轮询(Polling),改用事件驱动(Event-Driven)

  • 一旦判定逆流,调用 sem_post_op(信号量释放)。上面代码Judge_ActvPower_Reflux中的sem_post_op释放信号量。

  • 此动作会瞬间唤醒处于阻塞等待状态的高优先级执行线程。

int Creat_SemID(const char *pathname,int proj_id,int nsems ,int semflg)
{
    key_t key = ftok(pathname, proj_id);
    int semid = semget(key, nsems, semflg);
    return semid;
}

int Init_Sem(int semID,int nsems,int cmd,int val)
{

    int Ret = semctl(semID,nsems,cmd,val);  
    return Ret;
}

void sem_wait_op(int semid) 
{
    struct sembuf sop = {0, -1, 0};
    semop(semid, &sop, 1);
}

void sem_post_op(int semid) 
{
    struct sembuf sop = {0, 1, 0};
    semop(semid, &sop, 1);
}

4. 快速执行(调度阶段)

_thread_SEMS_Process_SemFrom485 线程中:

  • 重新计算:调用 Ems_Mgmt_Main_SEMS,根据当前逆流的功率差值,计算出每个储能节点(Node)应该调整到的目标功率。

  • 指令下发:站控则通过 Send_6330UctlWorkPwrAuto_UDP 发送控制指令。单柜则通过下发Send_CCU_Ctrl

      BOOL Send_6330UctlWorkPwrAuto_UDP(ROUGH_CCU_DATA* pCCU, ACDC_WORKSTATU emACDC_WorkStatuReq, int n, int msElapse)
      {
          EMS_MGMT_STU *pEMS = &pCCU->emsMgmtData;
          A_BLOB_BRANCH_NODE_CALC_DATA* pCalc_Node =   &pEMS->calc_Node[n];
          A_BLOB_BRANCH_GRP_RECTDATA* pBranchGrpData = &pCCU->strLSYSInfo.branchNodeData[n];
          PcsSMInfo* pSMInfo =                         &pCCU->semsSMInfos[n];
          SEMS_B6_LSYS_DATA_INFO* pB6LSYS =            &pBranchGrpData->b6LSYSLast_SEMS;
          double tmNow = GetCurrentTime();
          S63Uctl_30WorkPwr* pCtrlSend =               &pSMInfo->sems6330ctrlPwrReq;
          pCtrlSend->subCmd =     0x30;
          pCtrlSend->idxClient =  IDX_CLIENT;
          pCtrlSend->emACDC_WorkStatuCtrl = emACDC_WorkStatuReq;
          pCtrlSend->byWorkAsap = pEMS->byWorkAsap;
          // pCtrlSend->nStopSoc = (BYTE)pEMS->planSectorNow.nStopSOC;
          // pCtrlSend->nBattGrpPwrLmtPlan = pEMS->planSectorNow.nBattGrpPwrLmt;
          switch (emACDC_WorkStatuReq)
          {
              case STATU_ACDC_RECTIFY:
                  pCtrlSend->fActv_PwrCtrl = MIN(pCalc_Node->fACDC_Distri_4_BattChrg_Pwr, pB6LSYS->fMax_Chrg_PwrAbilitySum);
                  break;
              case STATU_ACDC_INVERTER:
                  // printf("pB6LSYS->fMax_DisChrg_PwrAbilitySum = [%0.3f]\n",pB6LSYS->fMax_DisChrg_PwrAbilitySum);
                  pCtrlSend->fActv_PwrCtrl = MIN(pCalc_Node->fBatt_Distri_4_ACDC_Pwr, pB6LSYS->fMax_DisChrg_PwrAbilitySum);
                  break;
    
              default:
                  pCtrlSend->fActv_PwrCtrl = 0;
                  pSMInfo->tm_LastStopWork = tmNow;
                  break;
          }
    
          float fActv_PwrReqDiff = (float)fabs(pSMInfo->fActv_PwrCtrl_Last - pCtrlSend->fActv_PwrCtrl);
          float fActv_PwrOutDiff = (float)fabs(fabs(pB6LSYS->fActvPwr_PCSGrpSum) - pCtrlSend->fActv_PwrCtrl);
          // printf("Send_6330UctlWorkPwrAuto_UDP n:%d fActv_PwrReqDiff:%.1f fActv_PwrOutDiff:%.1f fActv_Pwr:%.1f "
          //     "fActv_PwrCtrl_Last:%.1f fActvPwr_PCSGrpSum:%.1f\n",
          //     n, fActv_PwrReqDiff, fActv_PwrOutDiff, pCtrlSend->fActv_PwrCtrl,
          //     pSMInfo->fActv_PwrCtrl_Last, pB6LSYS->fActvPwr_PCSGrpSum);
          // if (fActv_PwrReqDiff>MIN_DIFF_VALUE_REQ || pCtrlSend->emACDC_WorkStatuCtrl!=pSMInfo->emACDC_WorkStatuCtrl_Last ||
          //     fActv_PwrOutDiff>MIN_PWR_OUT_VSREQ ||
          //     (ElapseMS(pSMInfo->tm_LastSendCmd_2Pcs) >= (UINT)msElapse))
          {
              USHORT usDataLen = (USHORT)sizeof(S63Uctl_30WorkPwr);
              TRACED("Send_6330UctlWorkPwrAuto_UDP n:%d emACDC_WorkStatuCtrl:%d fActv_Pwr:%.1f fActv_PwrReqDiff:%.1f fActv_PwrOutDiff:%.1f "
                  "fActv_PwrCtrl_Last:%.1f fActvPwr_PCSGrpSum:%.1f ElapseMS:%u %d <%s>\n",
                  n, pCtrlSend->emACDC_WorkStatuCtrl, pCtrlSend->fActv_PwrCtrl, fActv_PwrReqDiff, fActv_PwrOutDiff,
                  pSMInfo->fActv_PwrCtrl_Last, pB6LSYS->fActvPwr_PCSGrpSum, ElapseMS(pSMInfo->tm_LastSendCmd_2Pcs), msElapse,
                  printableBytes((BYTE*)pCtrlSend, usDataLen));
              // LogOut(MAIN_MODULE_MAIN, "QuickChannel", LOG_TYPE_WARING,
              //     "Send_6330UctlWorkPwrAuto_UDP Sent fActv_PwrCtrl[%.1f]",pCtrlSend->fActv_PwrCtrl);
    
              pSMInfo->emACDC_WorkStatuCtrl_Last = pCtrlSend->emACDC_WorkStatuCtrl;
              pSMInfo->fActv_PwrCtrl_Last =        pCtrlSend->fActv_PwrCtrl;
              pSMInfo->byWorkASAP_Last =           pCtrlSend->byWorkAsap;
              pSMInfo->tm_LastSendCmd_2Pcs = tmNow;
              reverseBytes((BYTE*)pCtrlSend, usDataLen);
              DoUDPSession_SEMS(n, NL_MSG_UCTL,  (BYTE*)pCtrlSend, usDataLen, MAX_MS_DoTCPSession_SEMS_NoWaitREP);
          }
    
          return TRUE;
      }
    
      BOOL Send_CCU_Ctrl(ROUGH_CCU_DATA* pCCU, A_BLOB_GRP_RECT_AUTOCTRL_CMD CCUCtrlCmd)
      {
          int nRet = 0;
          BYTE byData[16]={'\0'};
          memset(byData, 0, 16);
          int nSendLen = 0;
    
          A_BLOB_GRP_RECT_AUTOCTRL_CMD* pCmd = &CCUCtrlCmd;
    
          //get battGrp info
          A_BLOB_SINGLE_STBATT_RUNINFO  blobSingleBattData;
          VAR_VALUE varV;
          varV.blobValue = (BYTE *)&blobSingleBattData;
          // LJH241111 PcsSCADA
          if ( HasEMSFeature(pCCU->lcCfg.emSystemType) )
          {
              pCCU->pDPR->Get(pCCU->hSampler, GETTYPE_SP_VAR, 
                  501 + pCmd->nGrp, 551, &varV, sizeof(A_BLOB_SINGLE_STBATT_RUNINFO));
          }
    
          TRACEI("-------Send_CCU_Ctrl cmd A_BLOB_GRP_RECT_AUTOCTRL_CMD[%d] nGrpType=[%d] , fReq[%.1f, %.1f], fLmt[%.1f, %.1f]-----\n", 
              pCmd->nGrp, pCmd->emCtrl_Type, pCmd->fGrpVoltLmt, pCmd->fGrpCurrLmt, blobSingleBattData.fAllow_LmtChargeVolt, blobSingleBattData.fAllow_MaxChargeCurr);
    
          memcpy(&pCCU->blobGrpRectCtrl, pCmd, sizeof(A_BLOB_GRP_RECT_AUTOCTRL_CMD));
    
          TRACEI("Send_CCU_Ctrl emCharge_Statu=%d,nVoltReq=%.1f, nCurrReq=%.1f\n",\
          pCCU->blobGrpRectCtrl.emCtrl_Type,\
          pCCU->blobGrpRectCtrl.fGrpVoltLmt,\
          pCCU->blobGrpRectCtrl.fGrpCurrLmt);
    
          if(pCCU->blobGrpRectCtrl.emCtrl_Type <= GRP_ACDC_STOP)
          {
              byData[0] = 0x31 + pCmd->nGrp;
              nSendLen = 6;
              if(pCCU->blobGrpRectCtrl.emCtrl_Type == GRP_ACDC_STOP)
              {
                  byData[1] = 0x00;
              }
              else if(pCCU->blobGrpRectCtrl.emCtrl_Type == GRP_ACDC_RECTIFY_START)
              {
                  byData[1] = 0x01;
              }
              else if(pCCU->blobGrpRectCtrl.emCtrl_Type == GRP_ACDC_RECTIFY_ADJ_CURR)
              {
                  byData[1] = 0x02;
              }
              else if(pCCU->blobGrpRectCtrl.emCtrl_Type == GRP_ACDC_INVERT_START)
              {
                  byData[1] = 0x03;
              }
              else if(pCCU->blobGrpRectCtrl.emCtrl_Type == GRP_ACDC_INVERT_PENDING)
              {
                  byData[1] = 0x04;
              }
              if (HasEMSFeature(pCCU->lcCfg.emSystemType))
              {
                  TRACE("Send_CCU_Ctrl:Control HasEMSFeature fGrpVoltLmt:%.1f %.1f fGrpCurrLmt:%.1f %.1f %.1f\n",
                      pCCU->blobGrpRectCtrl.fGrpVoltLmt, blobSingleBattData.fAllow_LmtChargeVolt,
                      pCCU->blobGrpRectCtrl.fGrpCurrLmt, blobSingleBattData.fAllow_MaxChargeCurr, blobSingleBattData.fAllow_MaxDisChargeCurr);
    
                  pCCU->blobGrpRectCtrl.fGrpVoltLmt = MIN(pCCU->blobGrpRectCtrl.fGrpVoltLmt,
                      blobSingleBattData.fAllow_LmtChargeVolt);
    
                  if((pCCU->blobGrpRectCtrl.emCtrl_Type == GRP_ACDC_RECTIFY_START) ||
                  (pCCU->blobGrpRectCtrl.emCtrl_Type == GRP_ACDC_RECTIFY_ADJ_CURR))
                  {
                      pCCU->blobGrpRectCtrl.fGrpCurrLmt = MIN(pCCU->blobGrpRectCtrl.fGrpCurrLmt,
                          blobSingleBattData.fAllow_MaxChargeCurr);
                  }
                  else
                  {
                      pCCU->blobGrpRectCtrl.fGrpCurrLmt = MIN(pCCU->blobGrpRectCtrl.fGrpCurrLmt,
                          blobSingleBattData.fAllow_MaxDisChargeCurr);
                  }
              }
              TRACE("Send_CCU_Ctrl emCtrl_Type=%d ,nVoltReq=%.1f, nCurrReq=%.1f\n",\
                  pCCU->blobGrpRectCtrl.emCtrl_Type,\
                  pCCU->blobGrpRectCtrl.fGrpVoltLmt,\
                  pCCU->blobGrpRectCtrl.fGrpCurrLmt);
              // LogOut(MAIN_MODULE_MAIN, "QuickChannel", LOG_TYPE_WARING,
              //         "Sent to IMSU-D ---emCtrl_Type[%d] fActivePower[%.1f] fGrpVoltLmt[%.1f] fGrpCurrLmt[%.1f]",
              //         pCCU->blobGrpRectCtrl.emCtrl_Type,
              //         pCCU->blobGrpRectCtrl.fGrpVoltLmt * pCCU->blobGrpRectCtrl.fGrpCurrLmt,
              //         pCCU->blobGrpRectCtrl.fGrpVoltLmt,
              //         pCCU->blobGrpRectCtrl.fGrpCurrLmt);
    
              byData[2] = (BYTE)((UINT)(pCCU->blobGrpRectCtrl.fGrpVoltLmt*10) >> 8);
              byData[3] = (BYTE)(pCCU->blobGrpRectCtrl.fGrpVoltLmt*10);
              byData[4] = (BYTE)((UINT)(pCCU->blobGrpRectCtrl.fGrpCurrLmt*10) >> 8);
              byData[5] = (BYTE)(pCCU->blobGrpRectCtrl.fGrpCurrLmt*10);
          }
          else if(pCCU->blobGrpRectCtrl.emCtrl_Type <= GRP_DCDC_STOP)
          {
              byData[0] = 0x51 + pCmd->nGrp;
              nSendLen = 4;
              if(pCCU->blobGrpRectCtrl.emCtrl_Type == GRP_DCDC_STOP)
              {
                  byData[1] = 0x00;
              }
              else if(pCCU->blobGrpRectCtrl.emCtrl_Type == GRP_DCDC_RECTIFY_START)
              {
                  byData[1] = 0x01; 
              }
              else if(pCCU->blobGrpRectCtrl.emCtrl_Type == GRP_DCDC_RECTIFY_PENDING)
              {
                  byData[1] = 0x02; 
              }
    
              if(pCCU->blobGrpRectCtrl.emCtrl_Type == GRP_DCDC_RECTIFY_START && HasEMSFeature(pCCU->lcCfg.emSystemType))
              {
                  pCCU->blobGrpRectCtrl.fGrpCurrLmt = MIN(pCCU->blobGrpRectCtrl.fGrpCurrLmt,
                      blobSingleBattData.fAllow_MaxDisChargeCurr);
              }
              byData[2] = (BYTE)((UINT)(pCCU->blobGrpRectCtrl.fGrpVoltLmt*10) >> 8);  
              byData[3] = (BYTE)(pCCU->blobGrpRectCtrl.fGrpVoltLmt*10);  
    
              byData[4] = (BYTE)((UINT)(pCCU->blobGrpRectCtrl.fGrpCurrLmt*10) >> 8);
              byData[5] = (BYTE)(pCCU->blobGrpRectCtrl.fGrpCurrLmt*10);
          }
          TRACE("Send_CCU_Ctrl:Control emCtrl_Type:%d(0:rect 2:invt 4:stop) nGrp:%d UI:%.1f %.1f byData<%s>\n",
              pCCU->blobGrpRectCtrl.emCtrl_Type, pCCU->blobGrpRectCtrl.nGrp,
              pCCU->blobGrpRectCtrl.fGrpVoltLmt,
              pCCU->blobGrpRectCtrl.fGrpCurrLmt, printableBytes(byData, nSendLen));
          //2020-01-04 Do Not Wait Adjust Current Cmd
          nRet = DoTCPSession(pCCU, NL_MSG_UCTL, byData, nSendLen, 0) ? ERR_CONTROL_OK : ERR_CONTROL_FAILED;
    
          return nRet;
      }
    

三、 关键技术实现

1. "TCP+UDP" 双通道通信架构

这是该系统设计的精妙之处:

  • TCP通道:用于心跳、配置更新、注册等对可靠性要求高的逻辑。

  • UDP快速通道ReOpen_QuicklyCommPort_UDP_SERVER):在检测到逆流时,系统使用 UDP 协议下发功率指令。

    • 原因:UDP 无需握手,报文头小,延迟极低(微秒级),适合防逆流这种“争分夺秒”的场景。

    • 实现:每接入一个 TCP 客户端,系统会对应打开一个本地 9989 端口与客户端 10005 端口的 UDP 映射。

2. Linux 实时线程优化

Initialize_EMS_STATION 中,对处理线程做了如下配置:

  • 调度策略SCHED_RR(实时循环调度)。

  • 优先级98(极高优先级,仅次于系统内核关键进程)。

  • 目的:确保在系统 CPU 负载较高时,防逆流线程能优先获得处理权,避免因系统卡顿导致的响应延迟。

3. 基于信号量(Semaphore)的进程间通信

  • 使用 semget/semop 实现异步唤醒。这比互斥锁(Mutex)或条件变量更轻量,适合这种“单向通知”的快速触发机制。

三、 系统设计的关键亮点(难点突破)

1. 通信故障的安全闭锁(Fail-Safe)

代码中有一段在逻辑中非常关键的部分:

  • 逻辑:如果监测到电表通信中断(bIsCommFail),系统会立即下发 STATU_ACDC_IDLE 指令。

  • 意义:防逆流系统最怕“致盲”。如果电表坏了,系统不知道是否在逆流,最安全的做法是让储能立即停止放电(进入 IDLE 状态),防止在盲目状态下造成严重的逆流事故。

2. 跨层级 IP 自动映射

Initialize_EMS_STATION 中:

  • 系统通过 szFirstIP_EMSClientnNodeNum 自动管理多个下级节点。

  • 这种设计支持大规模分布式部署,一台站级 EMS 可以同时管理几十台储能一体机,并实现毫秒级的同步调度。

3. 兼容性设计(多模式运行)

系统能识别自己是“一体机”还是“站级站控”:

  • 一体机模式:直接通过 RS485 采样并控制。

  • 站级模式:负责收集全局数据,并通过网络(UDP)指挥各子节点。

System V 信号量信号量重点讲解:​

System V 信号量因其强大的内核持久性和功能集,依然是处理多进程/多线程同步的“老将”。

一、 函数功能深度解析

1. 钥匙的生成:ftok

Creat_SemID 中,第一步是 ftok(pathname, proj_id)

  • 原理:信号量是内核资源,必须有一个唯一的标识。ftok 将一个真实存在的路径名和一个项目 ID(8位)转换成一个系统唯一的 key_t 值。

  • 关键点:只要两个进程使用相同的路径和项目 ID,它们就能拿到同一把“钥匙”,从而访问同一个信号量。

2. 仓库的创建:semget

  • 功能:根据钥匙(key)获取信号量集的 ID。

  • 参数 semflg

    • IPC_CREAT:如果信号量不存在则创建。

    • 0666:设置权限(类似 Linux 文件权限),确保你的多个进程有权读写它。

  • 注意:System V 支持信号量集(一次创建多个信号量),你的代码中 nsems 设为 1,表示这个集合里只有一个信号量。

3. 初始化:Init_Semsemctl

  • 痛点semget 创建完信号量后,它的初始值是不确定的。

  • 功能:使用 SETVAL 命令将信号量设置为具体的值(比如 0 或 1)。

  • 原理:在你的防逆流代码中,初始值设为 0。这意味着处理线程一启动就会被阻塞,直到判定函数“释放”信号。

4. 核心操作:semop (Wait & Post)

这是最硬核的部分,通过填充 struct sembuf 结构体来告知内核:

  • sem_num: 操作集合中的第几个信号量(从 0 开始)。

  • sem_op:

    • -1 (P 操作/Wait):尝试扣减。如果当前值 > 0,则扣减 1 并继续;如果当前值 = 0,进程进入休眠阻塞状态,直到有人把它加回来。

    • +1 (V 操作/Post):增加计数值。如果有进程在阻塞等待,内核会唤醒它。

  • sem_flg:

    • 你的代码设为 0。通常在工业中,建议考虑 SEM_UNDO(见后文注意事项)。

二、 运作原理:内核里的“VIP 门禁卡”

  1. 内核维护:信号量不是存储在用户态内存里的,而是由 Linux 内核维护的一个整数。即使你的程序崩溃了,这个值依然存在于内核中。

  2. 原子性semop 操作是原子性的。意味着如果有 100 个线程同时尝试扣减信号量,内核会确保操作按顺序一个一个来,绝不会出现数据竞争(Race Condition)。

  3. 休眠队列:当信号量为 0 且执行 sem_wait_op 时,内核会将该线程放入一个等待队列并切出 CPU。这比 while(1) 轮询要省电、高效得多,因为不占 CPU 资源。


三、 工业开发中的致命注意事项(踩坑指南)

1. “僵尸”信号量:内核持久性

System V 信号量不会随进程结束而自动销毁。

  • 后果:如果你的 EMS 程序崩溃了,下次启动时,旧的信号量值可能还是之前的错误状态。

  • 对策:在程序正常退出或 main 函数启动前,显式调用 semctl(semid, 0, IPC_RMID) 来清理旧的信号量,或者在 ipcs -s 命令下手动查看。

2. 初始化竞争(Race Condition)

  • 风险:如果两个进程同时运行 Initialize_ESS,可能进程 A 刚 semget 完还没 Init_Sem,进程 B 就已经开始 sem_wait_op 了。

  • 对策:通常由主进程(Master)统一创建并初始化,子进程(Slave)只负责获取(不带 IPC_CREAT 标志)。

3. 忘记设置 SEM_UNDO

  • 场景:如果一个线程执行了 sem_wait_op 成功拿到了资源,但随后它**崩溃(Segfault)**了,没有执行 sem_post_op

  • 后果:信号量将永远锁定,其他线程会无限期死等。

  • 对策:将 sem_flg 设为 SEM_UNDO。这样如果进程异常退出,内核会自动撤销该进程对信号量所做的修改。

4. 信号量计数值的上限

虽然你只是用来做 0/1 切换,但如果触发非常频繁且处理线程处理太慢,信号量的值会一直累加。

  • 风险:如果判定函数疯狂执行 sem_post_op(比如 485 电表采样极快),可能会导致处理线程堆积了成千上万个待办任务。

四、 总结

这套代码的核心价值在于**“解耦”:

  • 电表采样线程只管判定,判定成功就丢一个“信号”到内核。

  • 控制执行线程在内核门口等着,信号一到立刻干活。