The implementation of anti-backflow logic based on Linux real-time threads and UDP fast channels.(PART ONE)
这段时间接触了一个防逆流的需求开发,整理一下基本的控制策略。
一、 防逆流系统的核心逻辑流程
整个防逆流过程可以被划分为:采样监测 -> 逻辑判定 -> 信号触发 -> 快速执行 四个阶段。我们按照分层的思想来讲解:
感知层:RS485 电表采样。
决策层:EMS 站级控制算法。
执行层:PCS 逆变器功率调整。
同时通讯双轨制:
TCP(控制面):处理心跳、配置、注册,保障 100% 到达。
UDP(数据面):专为防逆流设计的“快速通道”,规避拥塞控制,追求极低延迟。
二、技术解答
1. 采样监测(判定阶段)
“紧急警报器”安装,分为485电表线程,单柜的初始化函数,站控的初始化函数,以及站控的上报函数
pWhim_Rough->nSemCCUWith485Meter = Creat_SemID(SEM_CCU_485_PATH, SEM_CCU_485_PROJ_ID, 1, IPC_CREAT | 0666);
printf("whMeterSampler semid = [%d]\n",pWhim_Rough->nSemCCUWith485Meter);
LogOut(MAIN_MODULE_MAIN, "whMeterSampler", LOG_TYPE_INFO,
"whMeterSampler semid --> [%d] !\n",pWhim_Rough->nSemCCUWith485Meter);
if (pWhim_Rough->nSemCCUWith485Meter == -1)
{
LogOut(MAIN_MODULE_REPORTER, "whMeterSampler", LOG_TYPE_ERROR,
"Init SEMID FAILED!!!");
}
它在系统启动阶段申请了一个 System-V 信号量 (Semaphore)。
核心功能:创建一个跨进程/线程的“计数器”。
Creat_SemID:利用ftok产生的唯一 Key 获取一个信号量 ID。IPC_CREAT | 0666:如果没有就创建,并赋予读写权限。目的:为后续的异步控制提供物理句柄。没有这个 ID,判定函数就找不到“警报开关”,处理线程也收不到“警报信号”。
在 Judge_ActvPower_Reflux 函数中,系统实时监控电力仪表的有功功率:电表采样线程通道,电表采样Modbus快表。
双表比对:系统同时获取“柜内表(PCS侧)”和“网侧表(Grid侧)”的数据。
判定标准:
fGridPwr_Sum < 0:代表电网侧检测到功率反向流动(即逆流发生)。fCabPwr_Sum < 0:代表储能系统正处于放电状态。
结论:当两者同时为负时,说明储能放电过大,导致电能倒灌回电网,触发防逆流调节。
2. 逻辑判定
函数分析:
Judge_ActvPower_Reflux(单柜),站控(_ThreadEntry_SlaveUDPComm)逻辑关键:
双表比对:网侧表 (Grid) 与柜内表 (Cabinet) 功率方向的一致性判定。
功率阈值计算:何时触发
sem_post_op?代码段展示:分析有功功率为负时的处理逻辑。
....................
//init ccu and slave reporter sem
pSlave->nSemWithCCUAndSlave = Creat_SemID(SEM_CCU_485_PATH, SEM_CCU_485_PROJ_ID, 1, IPC_CREAT | 0666);
TRACE("slave reporter Init semid = [%d]\n",pSlave->nSemWithCCUAndSlave);
LogOut(MAIN_MODULE_MAIN, "SLAVE_UDP_REPORTER", LOG_TYPE_INFO,
"SLAVE_UDP_REPORTER semid Init --> [%d] !\n",pSlave->nSemWithCCUAndSlave);
if (pSlave->nSemWithCCUAndSlave != -1)
{
Init_Sem(pSlave->nSemWithCCUAndSlave, 0, SETVAL, 0);
}
if (!ReOpen_QuicklyCommPort_UDP_CLIENT(pReporter))
{
TRACEE("Slave Initialize ReOpen_QuicklyCommPort_UDP_CLIENT failed");
return FALSE;
}
pReporter->hThread = RunThread_Create("SlaveTCPComm",
(RUN_THREAD_START_PROC)_ThreadEntry_SlaveTCPComm,
(void *)(pReporter),
(DWORD*)&pReporter->bExit);
if(pReporter->hThread == NULL)
{
LogOut(MAIN_MODULE_REPORTER, "SlaveTCPComm", LOG_TYPE_ERROR,
"Init SlaveTCPComm FAILED!!!");
return FALSE;
}
//----- Create A Thread To reveive the UDP messages from SEMS -----//
struct sched_param param;
pthread_attr_t attr;
pthread_t hThread;
pthread_attr_init(&attr);
param.sched_priority = 1; //数值越大,优先级越高。
pthread_attr_setschedpolicy(&attr,SCHED_RR); //实时调度策略
pthread_attr_setschedparam(&attr,¶m); //设置优先级
pthread_attr_setinheritsched(&attr,PTHREAD_EXPLICIT_SCHED); //要使优先级其作用必须要有这句话
if(pthread_create(&hThread, &attr, _ThreadEntry_SlaveUDPComm,
(void *)pReporter) != 0) //0-Success
{
TRACE("----- Create _ThreadEntry_SlaveUDPComm Failed -----\n");
LogOut(MAIN_MODULE_REPORTER, "SlaveUDPComm", LOG_TYPE_ERROR,
"Init _ThreadEntry_SlaveUDPComm FAILED!!!");
pthread_attr_destroy(&attr);
return FALSE;
}
.................
static DWORD _ThreadEntry_SlaveUDPComm(void* pArg)
{
REPORTER* pReporter = (REPORTER*)pArg;
ROUGH_SLAVE_ROUGH* pSlave = (ROUGH_SLAVE_ROUGH*)pReporter->pDataHeap;
BYTE byBuff[TCP_BUFF_LEN];
int nReadBytes = 0;
memset(byBuff, 0, TCP_BUFF_LEN);
while(!pReporter->bExit)
{
//block wait receive 6330-UCTRL
nReadBytes = s_pUDPCommClient->Read(s_hUDPPortClient, byBuff, TCP_BUFF_LEN);
TRACE("_ThreadEntry_SlaveUDPComm Read...nReadBytes=[%d]\n",nReadBytes);
if(nReadBytes >= NLMSG_MIN_LEN)
{
//now try to find the frame and parse it
BYTE* pFoundPos = byBuff;
do
{
BYTE *pDataPos;
USHORT usDataLen;
NLMSG_FRAME_INFO* pFrm = Find_NLMsgFrame(pSlave, pFoundPos, nReadBytes, &pDataPos, &usDataLen);
if(pFrm == NULL)
{
TRACE("---Get Frm Is NULL---\n");
break;
}
else if(/*(pFrm->byCmd == pSlave->nlFrameInfo[NL_MSG_CFGG].byCmd)||
(pFrm->byCmd == pSlave->nlFrameInfo[NL_MSG_CFGS].byCmd)||
(pFrm->byCmd == pSlave->nlFrameInfo[NL_MSG_USYS].byCmd)||
(pFrm->byCmd == pSlave->nlFrameInfo[NL_MSG_RTIF].byCmd)||*/
(pFrm->byCmd == pSlave->nlFrameInfo[NL_MSG_UCTL].byCmd))
{
TRACE("slave Do UDP Parsing byCmd=[%02X]\n",pFrm->byCmd);
//TRACE("NL_MSG_CFGG[%d] NL_MSG_CFGS[%d] NL_MSG_UCTL[%d] NL_MSG_RTIF[%d] NL_MSG_USYS[%d] ",pSlave->nlFrameInfo[NL_MSG_CFGG].byCmd,pSlave->nlFrameInfo[NL_MSG_CFGS].byCmd,pSlave->nlFrameInfo[NL_MSG_UCTL].byCmd,pSlave->nlFrameInfo[NL_MSG_RTIF].byCmd,pSlave->nlFrameInfo[NL_MSG_USYS].byCmd);
pFrm->pfnParser(pSlave, pDataPos, usDataLen, PROTOCOL_UDP);
if (pSlave->nSemWithCCUAndSlave != -1)
{
// static time_t tmDebugOutSentSem = 0;
// if (ElapseS(tmDebugOutSentSem) > 30)
// {
// tmDebugOutSentSem = _NOW_;
// TRACEI("============================slaveReporter sent sem to ccusampler time [%0.3f]============================\n",tmNow);
// }
sem_post_op(pSlave->nSemWithCCUAndSlave);//send sem to ccusampler
}
}
else
{
TRACE("Info: Maybe Other Msg byCmd=[%02X]\n", pFrm->byCmd);
}
nReadBytes -= usDataLen + NLMSG_MIN_LEN;
pFoundPos += usDataLen + NLMSG_MIN_LEN;
} while (nReadBytes >= NLMSG_MIN_LEN);
}
}
}
static BOOL Judge_ActvPower_Reflux(IN RS485_SAMPLER_DATA* pWhim_Rough)
{
//如果没有安装柜内表或者网侧表,就不处理。例如站级EMS-从设备没有安装网侧表,只安装了柜内表;不需要处理
//站级EMS 快速防逆流必须接柜内表和网侧表
//从设备 只安装了柜内表
//一体机 安装了柜内表和网侧表
if((pWhim_Rough->meterData[407 - KWHMETER_INDEX_START].emMeterType == METER_TYPE_NotInstalled)
|| (pWhim_Rough->meterData[408 - KWHMETER_INDEX_START].emMeterType == METER_TYPE_NotInstalled))
{
return 0;
}
char TempBuf[512];
float fGridPwr_Sum = 0, fGridPwr_A = 0, fGridPwr_B = 0, fGridPwr_C = 0, fCabPwr_Sum = 0;
double tmNow = GetCurrentTime(); // current time.
fCabPwr_Sum = pWhim_Rough->meterData[407 - KWHMETER_INDEX_START].varWHMeterRough[WHMETER_ROUGH_RTACTPower].fValue; //cab
fGridPwr_Sum = pWhim_Rough->meterData[408 - KWHMETER_INDEX_START].varWHMeterRough[WHMETER_ROUGH_RTACTPower].fValue; //grid meter
fGridPwr_A = pWhim_Rough->meterData[408 - KWHMETER_INDEX_START].varWHMeterRough[WHMETER_ROUGH_ACTPower_PhA].fValue;
fGridPwr_B = pWhim_Rough->meterData[408 - KWHMETER_INDEX_START].varWHMeterRough[WHMETER_ROUGH_ACTPower_PhB].fValue;
fGridPwr_C = pWhim_Rough->meterData[408 - KWHMETER_INDEX_START].varWHMeterRough[WHMETER_ROUGH_ACTPower_PhC].fValue;
int n;
for(n = 0 ; n < MAX_WH_NUM; n++)
{
if(pWhim_Rough->meterData[n].bIsCommFail == FALSE)
{
continue;
}
if(ElapseMS(LastSentPostTime) > 10000)
{
LastSentPostTime = tmNow;
TRACEW("Meter Alarm [%d] time [ %0.3f ]", n, tmNow);
// LogOut(MAIN_MODULE_MAIN, "QuickChannel", LOG_TYPE_WARING,
// "Meter Alarm [%d]",n);
sem_post_op(pWhim_Rough->nSemCCUWith485Meter); //send sem to ccusampler
return TRUE;
}
}
if ((fCabPwr_Sum < 0) && (fGridPwr_Sum < 0))
{
if (pWhim_Rough->nSemCCUWith485Meter != -1)
{
TRACEW("Start to Adjust ---Grid[ %0.3f ] Cab[ %0.3f ] time [ %0.3f ]",fGridPwr_Sum,fCabPwr_Sum,tmNow);
// LogOut(MAIN_MODULE_MAIN, "QuickChannel", LOG_TYPE_WARING,
// "Start to Adjust ---Grid[ %0.3f ] Cab[ %0.3f ]",fGridPwr_Sum,fCabPwr_Sum);
LastSentPostTime = GetCurrentTime();
sem_post_op(pWhim_Rough->nSemCCUWith485Meter); //send sem to ccusampler
}
}
return TRUE;
}
3. 信号触发(同步阶段)
为了追求极致的响应速度,系统放弃了传统的轮询(Polling),改用事件驱动(Event-Driven):
一旦判定逆流,调用
sem_post_op(信号量释放)。上面代码Judge_ActvPower_Reflux中的sem_post_op释放信号量。此动作会瞬间唤醒处于阻塞等待状态的高优先级执行线程。
int Creat_SemID(const char *pathname,int proj_id,int nsems ,int semflg)
{
key_t key = ftok(pathname, proj_id);
int semid = semget(key, nsems, semflg);
return semid;
}
int Init_Sem(int semID,int nsems,int cmd,int val)
{
int Ret = semctl(semID,nsems,cmd,val);
return Ret;
}
void sem_wait_op(int semid)
{
struct sembuf sop = {0, -1, 0};
semop(semid, &sop, 1);
}
void sem_post_op(int semid)
{
struct sembuf sop = {0, 1, 0};
semop(semid, &sop, 1);
}
4. 快速执行(调度阶段)
在 _thread_SEMS_Process_SemFrom485 线程中:
重新计算:调用
Ems_Mgmt_Main_SEMS,根据当前逆流的功率差值,计算出每个储能节点(Node)应该调整到的目标功率。指令下发:站控则通过
Send_6330UctlWorkPwrAuto_UDP发送控制指令。单柜则通过下发Send_CCU_CtrlBOOL Send_6330UctlWorkPwrAuto_UDP(ROUGH_CCU_DATA* pCCU, ACDC_WORKSTATU emACDC_WorkStatuReq, int n, int msElapse) { EMS_MGMT_STU *pEMS = &pCCU->emsMgmtData; A_BLOB_BRANCH_NODE_CALC_DATA* pCalc_Node = &pEMS->calc_Node[n]; A_BLOB_BRANCH_GRP_RECTDATA* pBranchGrpData = &pCCU->strLSYSInfo.branchNodeData[n]; PcsSMInfo* pSMInfo = &pCCU->semsSMInfos[n]; SEMS_B6_LSYS_DATA_INFO* pB6LSYS = &pBranchGrpData->b6LSYSLast_SEMS; double tmNow = GetCurrentTime(); S63Uctl_30WorkPwr* pCtrlSend = &pSMInfo->sems6330ctrlPwrReq; pCtrlSend->subCmd = 0x30; pCtrlSend->idxClient = IDX_CLIENT; pCtrlSend->emACDC_WorkStatuCtrl = emACDC_WorkStatuReq; pCtrlSend->byWorkAsap = pEMS->byWorkAsap; // pCtrlSend->nStopSoc = (BYTE)pEMS->planSectorNow.nStopSOC; // pCtrlSend->nBattGrpPwrLmtPlan = pEMS->planSectorNow.nBattGrpPwrLmt; switch (emACDC_WorkStatuReq) { case STATU_ACDC_RECTIFY: pCtrlSend->fActv_PwrCtrl = MIN(pCalc_Node->fACDC_Distri_4_BattChrg_Pwr, pB6LSYS->fMax_Chrg_PwrAbilitySum); break; case STATU_ACDC_INVERTER: // printf("pB6LSYS->fMax_DisChrg_PwrAbilitySum = [%0.3f]\n",pB6LSYS->fMax_DisChrg_PwrAbilitySum); pCtrlSend->fActv_PwrCtrl = MIN(pCalc_Node->fBatt_Distri_4_ACDC_Pwr, pB6LSYS->fMax_DisChrg_PwrAbilitySum); break; default: pCtrlSend->fActv_PwrCtrl = 0; pSMInfo->tm_LastStopWork = tmNow; break; } float fActv_PwrReqDiff = (float)fabs(pSMInfo->fActv_PwrCtrl_Last - pCtrlSend->fActv_PwrCtrl); float fActv_PwrOutDiff = (float)fabs(fabs(pB6LSYS->fActvPwr_PCSGrpSum) - pCtrlSend->fActv_PwrCtrl); // printf("Send_6330UctlWorkPwrAuto_UDP n:%d fActv_PwrReqDiff:%.1f fActv_PwrOutDiff:%.1f fActv_Pwr:%.1f " // "fActv_PwrCtrl_Last:%.1f fActvPwr_PCSGrpSum:%.1f\n", // n, fActv_PwrReqDiff, fActv_PwrOutDiff, pCtrlSend->fActv_PwrCtrl, // pSMInfo->fActv_PwrCtrl_Last, pB6LSYS->fActvPwr_PCSGrpSum); // if (fActv_PwrReqDiff>MIN_DIFF_VALUE_REQ || pCtrlSend->emACDC_WorkStatuCtrl!=pSMInfo->emACDC_WorkStatuCtrl_Last || // fActv_PwrOutDiff>MIN_PWR_OUT_VSREQ || // (ElapseMS(pSMInfo->tm_LastSendCmd_2Pcs) >= (UINT)msElapse)) { USHORT usDataLen = (USHORT)sizeof(S63Uctl_30WorkPwr); TRACED("Send_6330UctlWorkPwrAuto_UDP n:%d emACDC_WorkStatuCtrl:%d fActv_Pwr:%.1f fActv_PwrReqDiff:%.1f fActv_PwrOutDiff:%.1f " "fActv_PwrCtrl_Last:%.1f fActvPwr_PCSGrpSum:%.1f ElapseMS:%u %d <%s>\n", n, pCtrlSend->emACDC_WorkStatuCtrl, pCtrlSend->fActv_PwrCtrl, fActv_PwrReqDiff, fActv_PwrOutDiff, pSMInfo->fActv_PwrCtrl_Last, pB6LSYS->fActvPwr_PCSGrpSum, ElapseMS(pSMInfo->tm_LastSendCmd_2Pcs), msElapse, printableBytes((BYTE*)pCtrlSend, usDataLen)); // LogOut(MAIN_MODULE_MAIN, "QuickChannel", LOG_TYPE_WARING, // "Send_6330UctlWorkPwrAuto_UDP Sent fActv_PwrCtrl[%.1f]",pCtrlSend->fActv_PwrCtrl); pSMInfo->emACDC_WorkStatuCtrl_Last = pCtrlSend->emACDC_WorkStatuCtrl; pSMInfo->fActv_PwrCtrl_Last = pCtrlSend->fActv_PwrCtrl; pSMInfo->byWorkASAP_Last = pCtrlSend->byWorkAsap; pSMInfo->tm_LastSendCmd_2Pcs = tmNow; reverseBytes((BYTE*)pCtrlSend, usDataLen); DoUDPSession_SEMS(n, NL_MSG_UCTL, (BYTE*)pCtrlSend, usDataLen, MAX_MS_DoTCPSession_SEMS_NoWaitREP); } return TRUE; }BOOL Send_CCU_Ctrl(ROUGH_CCU_DATA* pCCU, A_BLOB_GRP_RECT_AUTOCTRL_CMD CCUCtrlCmd) { int nRet = 0; BYTE byData[16]={'\0'}; memset(byData, 0, 16); int nSendLen = 0; A_BLOB_GRP_RECT_AUTOCTRL_CMD* pCmd = &CCUCtrlCmd; //get battGrp info A_BLOB_SINGLE_STBATT_RUNINFO blobSingleBattData; VAR_VALUE varV; varV.blobValue = (BYTE *)&blobSingleBattData; // LJH241111 PcsSCADA if ( HasEMSFeature(pCCU->lcCfg.emSystemType) ) { pCCU->pDPR->Get(pCCU->hSampler, GETTYPE_SP_VAR, 501 + pCmd->nGrp, 551, &varV, sizeof(A_BLOB_SINGLE_STBATT_RUNINFO)); } TRACEI("-------Send_CCU_Ctrl cmd A_BLOB_GRP_RECT_AUTOCTRL_CMD[%d] nGrpType=[%d] , fReq[%.1f, %.1f], fLmt[%.1f, %.1f]-----\n", pCmd->nGrp, pCmd->emCtrl_Type, pCmd->fGrpVoltLmt, pCmd->fGrpCurrLmt, blobSingleBattData.fAllow_LmtChargeVolt, blobSingleBattData.fAllow_MaxChargeCurr); memcpy(&pCCU->blobGrpRectCtrl, pCmd, sizeof(A_BLOB_GRP_RECT_AUTOCTRL_CMD)); TRACEI("Send_CCU_Ctrl emCharge_Statu=%d,nVoltReq=%.1f, nCurrReq=%.1f\n",\ pCCU->blobGrpRectCtrl.emCtrl_Type,\ pCCU->blobGrpRectCtrl.fGrpVoltLmt,\ pCCU->blobGrpRectCtrl.fGrpCurrLmt); if(pCCU->blobGrpRectCtrl.emCtrl_Type <= GRP_ACDC_STOP) { byData[0] = 0x31 + pCmd->nGrp; nSendLen = 6; if(pCCU->blobGrpRectCtrl.emCtrl_Type == GRP_ACDC_STOP) { byData[1] = 0x00; } else if(pCCU->blobGrpRectCtrl.emCtrl_Type == GRP_ACDC_RECTIFY_START) { byData[1] = 0x01; } else if(pCCU->blobGrpRectCtrl.emCtrl_Type == GRP_ACDC_RECTIFY_ADJ_CURR) { byData[1] = 0x02; } else if(pCCU->blobGrpRectCtrl.emCtrl_Type == GRP_ACDC_INVERT_START) { byData[1] = 0x03; } else if(pCCU->blobGrpRectCtrl.emCtrl_Type == GRP_ACDC_INVERT_PENDING) { byData[1] = 0x04; } if (HasEMSFeature(pCCU->lcCfg.emSystemType)) { TRACE("Send_CCU_Ctrl:Control HasEMSFeature fGrpVoltLmt:%.1f %.1f fGrpCurrLmt:%.1f %.1f %.1f\n", pCCU->blobGrpRectCtrl.fGrpVoltLmt, blobSingleBattData.fAllow_LmtChargeVolt, pCCU->blobGrpRectCtrl.fGrpCurrLmt, blobSingleBattData.fAllow_MaxChargeCurr, blobSingleBattData.fAllow_MaxDisChargeCurr); pCCU->blobGrpRectCtrl.fGrpVoltLmt = MIN(pCCU->blobGrpRectCtrl.fGrpVoltLmt, blobSingleBattData.fAllow_LmtChargeVolt); if((pCCU->blobGrpRectCtrl.emCtrl_Type == GRP_ACDC_RECTIFY_START) || (pCCU->blobGrpRectCtrl.emCtrl_Type == GRP_ACDC_RECTIFY_ADJ_CURR)) { pCCU->blobGrpRectCtrl.fGrpCurrLmt = MIN(pCCU->blobGrpRectCtrl.fGrpCurrLmt, blobSingleBattData.fAllow_MaxChargeCurr); } else { pCCU->blobGrpRectCtrl.fGrpCurrLmt = MIN(pCCU->blobGrpRectCtrl.fGrpCurrLmt, blobSingleBattData.fAllow_MaxDisChargeCurr); } } TRACE("Send_CCU_Ctrl emCtrl_Type=%d ,nVoltReq=%.1f, nCurrReq=%.1f\n",\ pCCU->blobGrpRectCtrl.emCtrl_Type,\ pCCU->blobGrpRectCtrl.fGrpVoltLmt,\ pCCU->blobGrpRectCtrl.fGrpCurrLmt); // LogOut(MAIN_MODULE_MAIN, "QuickChannel", LOG_TYPE_WARING, // "Sent to IMSU-D ---emCtrl_Type[%d] fActivePower[%.1f] fGrpVoltLmt[%.1f] fGrpCurrLmt[%.1f]", // pCCU->blobGrpRectCtrl.emCtrl_Type, // pCCU->blobGrpRectCtrl.fGrpVoltLmt * pCCU->blobGrpRectCtrl.fGrpCurrLmt, // pCCU->blobGrpRectCtrl.fGrpVoltLmt, // pCCU->blobGrpRectCtrl.fGrpCurrLmt); byData[2] = (BYTE)((UINT)(pCCU->blobGrpRectCtrl.fGrpVoltLmt*10) >> 8); byData[3] = (BYTE)(pCCU->blobGrpRectCtrl.fGrpVoltLmt*10); byData[4] = (BYTE)((UINT)(pCCU->blobGrpRectCtrl.fGrpCurrLmt*10) >> 8); byData[5] = (BYTE)(pCCU->blobGrpRectCtrl.fGrpCurrLmt*10); } else if(pCCU->blobGrpRectCtrl.emCtrl_Type <= GRP_DCDC_STOP) { byData[0] = 0x51 + pCmd->nGrp; nSendLen = 4; if(pCCU->blobGrpRectCtrl.emCtrl_Type == GRP_DCDC_STOP) { byData[1] = 0x00; } else if(pCCU->blobGrpRectCtrl.emCtrl_Type == GRP_DCDC_RECTIFY_START) { byData[1] = 0x01; } else if(pCCU->blobGrpRectCtrl.emCtrl_Type == GRP_DCDC_RECTIFY_PENDING) { byData[1] = 0x02; } if(pCCU->blobGrpRectCtrl.emCtrl_Type == GRP_DCDC_RECTIFY_START && HasEMSFeature(pCCU->lcCfg.emSystemType)) { pCCU->blobGrpRectCtrl.fGrpCurrLmt = MIN(pCCU->blobGrpRectCtrl.fGrpCurrLmt, blobSingleBattData.fAllow_MaxDisChargeCurr); } byData[2] = (BYTE)((UINT)(pCCU->blobGrpRectCtrl.fGrpVoltLmt*10) >> 8); byData[3] = (BYTE)(pCCU->blobGrpRectCtrl.fGrpVoltLmt*10); byData[4] = (BYTE)((UINT)(pCCU->blobGrpRectCtrl.fGrpCurrLmt*10) >> 8); byData[5] = (BYTE)(pCCU->blobGrpRectCtrl.fGrpCurrLmt*10); } TRACE("Send_CCU_Ctrl:Control emCtrl_Type:%d(0:rect 2:invt 4:stop) nGrp:%d UI:%.1f %.1f byData<%s>\n", pCCU->blobGrpRectCtrl.emCtrl_Type, pCCU->blobGrpRectCtrl.nGrp, pCCU->blobGrpRectCtrl.fGrpVoltLmt, pCCU->blobGrpRectCtrl.fGrpCurrLmt, printableBytes(byData, nSendLen)); //2020-01-04 Do Not Wait Adjust Current Cmd nRet = DoTCPSession(pCCU, NL_MSG_UCTL, byData, nSendLen, 0) ? ERR_CONTROL_OK : ERR_CONTROL_FAILED; return nRet; }
三、 关键技术实现
1. "TCP+UDP" 双通道通信架构
这是该系统设计的精妙之处:
TCP通道:用于心跳、配置更新、注册等对可靠性要求高的逻辑。
UDP快速通道(
ReOpen_QuicklyCommPort_UDP_SERVER):在检测到逆流时,系统使用 UDP 协议下发功率指令。原因:UDP 无需握手,报文头小,延迟极低(微秒级),适合防逆流这种“争分夺秒”的场景。
实现:每接入一个 TCP 客户端,系统会对应打开一个本地 9989 端口与客户端 10005 端口的 UDP 映射。
2. Linux 实时线程优化
在 Initialize_EMS_STATION 中,对处理线程做了如下配置:
调度策略:
SCHED_RR(实时循环调度)。优先级:
98(极高优先级,仅次于系统内核关键进程)。目的:确保在系统 CPU 负载较高时,防逆流线程能优先获得处理权,避免因系统卡顿导致的响应延迟。
3. 基于信号量(Semaphore)的进程间通信
- 使用
semget/semop实现异步唤醒。这比互斥锁(Mutex)或条件变量更轻量,适合这种“单向通知”的快速触发机制。
三、 系统设计的关键亮点(难点突破)
1. 通信故障的安全闭锁(Fail-Safe)
代码中有一段在逻辑中非常关键的部分:
逻辑:如果监测到电表通信中断(
bIsCommFail),系统会立即下发STATU_ACDC_IDLE指令。意义:防逆流系统最怕“致盲”。如果电表坏了,系统不知道是否在逆流,最安全的做法是让储能立即停止放电(进入 IDLE 状态),防止在盲目状态下造成严重的逆流事故。
2. 跨层级 IP 自动映射
在 Initialize_EMS_STATION 中:
系统通过
szFirstIP_EMSClient和nNodeNum自动管理多个下级节点。这种设计支持大规模分布式部署,一台站级 EMS 可以同时管理几十台储能一体机,并实现毫秒级的同步调度。
3. 兼容性设计(多模式运行)
系统能识别自己是“一体机”还是“站级站控”:
一体机模式:直接通过 RS485 采样并控制。
站级模式:负责收集全局数据,并通过网络(UDP)指挥各子节点。
System V 信号量信号量重点讲解:
System V 信号量因其强大的内核持久性和功能集,依然是处理多进程/多线程同步的“老将”。
一、 函数功能深度解析
1. 钥匙的生成:ftok
在 Creat_SemID 中,第一步是 ftok(pathname, proj_id)。
原理:信号量是内核资源,必须有一个唯一的标识。
ftok将一个真实存在的路径名和一个项目 ID(8位)转换成一个系统唯一的key_t值。关键点:只要两个进程使用相同的路径和项目 ID,它们就能拿到同一把“钥匙”,从而访问同一个信号量。
2. 仓库的创建:semget
功能:根据钥匙(key)获取信号量集的 ID。
参数
semflg:IPC_CREAT:如果信号量不存在则创建。0666:设置权限(类似 Linux 文件权限),确保你的多个进程有权读写它。
注意:System V 支持信号量集(一次创建多个信号量),你的代码中
nsems设为 1,表示这个集合里只有一个信号量。
3. 初始化:Init_Sem 与 semctl
痛点:
semget创建完信号量后,它的初始值是不确定的。功能:使用
SETVAL命令将信号量设置为具体的值(比如 0 或 1)。原理:在你的防逆流代码中,初始值设为 0。这意味着处理线程一启动就会被阻塞,直到判定函数“释放”信号。
4. 核心操作:semop (Wait & Post)
这是最硬核的部分,通过填充 struct sembuf 结构体来告知内核:
sem_num: 操作集合中的第几个信号量(从 0 开始)。sem_op:-1 (P 操作/Wait):尝试扣减。如果当前值 > 0,则扣减 1 并继续;如果当前值 = 0,进程进入休眠阻塞状态,直到有人把它加回来。
+1 (V 操作/Post):增加计数值。如果有进程在阻塞等待,内核会唤醒它。
sem_flg:- 你的代码设为
0。通常在工业中,建议考虑SEM_UNDO(见后文注意事项)。
- 你的代码设为
二、 运作原理:内核里的“VIP 门禁卡”
内核维护:信号量不是存储在用户态内存里的,而是由 Linux 内核维护的一个整数。即使你的程序崩溃了,这个值依然存在于内核中。
原子性:
semop操作是原子性的。意味着如果有 100 个线程同时尝试扣减信号量,内核会确保操作按顺序一个一个来,绝不会出现数据竞争(Race Condition)。休眠队列:当信号量为 0 且执行
sem_wait_op时,内核会将该线程放入一个等待队列并切出 CPU。这比while(1)轮询要省电、高效得多,因为不占 CPU 资源。
三、 工业开发中的致命注意事项(踩坑指南)
1. “僵尸”信号量:内核持久性
System V 信号量不会随进程结束而自动销毁。
后果:如果你的 EMS 程序崩溃了,下次启动时,旧的信号量值可能还是之前的错误状态。
对策:在程序正常退出或
main函数启动前,显式调用semctl(semid, 0, IPC_RMID)来清理旧的信号量,或者在ipcs -s命令下手动查看。
2. 初始化竞争(Race Condition)
风险:如果两个进程同时运行
Initialize_ESS,可能进程 A 刚semget完还没Init_Sem,进程 B 就已经开始sem_wait_op了。对策:通常由主进程(Master)统一创建并初始化,子进程(Slave)只负责获取(不带
IPC_CREAT标志)。
3. 忘记设置 SEM_UNDO
场景:如果一个线程执行了
sem_wait_op成功拿到了资源,但随后它**崩溃(Segfault)**了,没有执行sem_post_op。后果:信号量将永远锁定,其他线程会无限期死等。
对策:将
sem_flg设为SEM_UNDO。这样如果进程异常退出,内核会自动撤销该进程对信号量所做的修改。
4. 信号量计数值的上限
虽然你只是用来做 0/1 切换,但如果触发非常频繁且处理线程处理太慢,信号量的值会一直累加。
- 风险:如果判定函数疯狂执行
sem_post_op(比如 485 电表采样极快),可能会导致处理线程堆积了成千上万个待办任务。
四、 总结
这套代码的核心价值在于**“解耦”:
电表采样线程只管判定,判定成功就丢一个“信号”到内核。
控制执行线程在内核门口等着,信号一到立刻干活。