小弟在做一個串口數采專案,用的宇泰的設備,現在是有16臺電箱,每臺電箱都連著一個rs232串口控制板,根據燈亮或者滅進行分析。
老板要求我做一個程式,多執行緒異步訪問,把我的程式設定成客戶端,電箱設備設定為服務器。不要吐槽,我也知道我的程式是服務器,電箱是客戶端會簡單很多。但是已經這么做了。現在的問題是,如果電箱(服務器)都是正常的,但是如果某一臺電箱被關閉了,就影響我的數采。我想在socket通訊的時候捕獲錯誤,然后跳過這個執行緒。
另外我的程式相對有點復雜,先是用A程式用多執行緒異步socket訪問電箱,獲取源資料后進行初步處理丟進RabbitMQ,然后B程式從RabbitMQ獲取對應資料再進行第二步分析。
現在看到的現象是,在某一臺電箱斷掉后,別的執行緒還是繼續往RabbitMQ丟資料,但是B程式不讀取了,我試過在A程式中去掉壞了的電箱IP,然后繼續執行,能正常數采,所以影響我數采的是A程式而不是B程式,我現在只想在A程式出錯時,也就是10060錯誤時,跳過或者關閉這一條執行緒不影響我的數采,這一步做完之后再考慮不停的監控,再等待這個壞掉的電箱變好再重新獲取它的資料。
另:宇泰的設備(我這個版本)不支持心跳
高手大佬若能解決我的問題,必有重謝,只求能有真的解決方案,'以專業開發人員為伍'大佬不許回帖,小弟能力有限看不懂你的回答。謝謝!
下貼代碼:
A程式一開始遍歷開啟多執行緒:
LimitedConcurrencyLevelTaskScheduler lcts = new LimitedConcurrencyLevelTaskScheduler(16);
List<Task> tasks = new List<Task>();
// Create a TaskFactory and pass it our custom scheduler.
TaskFactory factory = new TaskFactory(lcts);
CancellationTokenSource cts = new CancellationTokenSource();
// Use our factory to run a set of tasks.
Object lockObj = new Object();
List<string> ip_list = new List<string>();
ip_list.Add("10.176.149.11");
ip_list.Add("10.176.149.12");
ip_list.Add("10.176.149.13");
ip_list.Add("10.176.149.14");
ip_list.Add("10.176.149.15");
ip_list.Add("10.176.149.16");
ip_list.Add("10.176.149.17");
ip_list.Add("10.176.149.18");
ip_list.Add("10.176.149.19");
ip_list.Add("10.176.149.20");
ip_list.Add("10.176.149.21");
ip_list.Add("10.176.149.22");
ip_list.Add("10.176.149.23");
ip_list.Add("10.176.149.24");
ip_list.Add("10.176.149.25");
ip_list.Add("10.176.149.26");
try
{
foreach (var item in ip_list)
{
Task t = factory.StartNew(() =>
{
MySocket ms1 = new MySocket();
ms1.connect(item, Convert.ToInt32(10001));
System.Timers.Timer timer = new System.Timers.Timer(100);
timer.Elapsed += new System.Timers.ElapsedEventHandler(ms1.myTimer_Elapsed);
timer.Enabled = true;
ms1.client.Close();
}, cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default); //TaskCreationOptions.LongRunning
tasks.Add(t);
}
Task.WaitAll(tasks.ToArray());
cts.Dispose();
}
catch (Exception ex)
{
Console.WriteLine(ex);
throw;
}
A程式MySocket類:
private static ManualResetEvent connectDone = new ManualResetEvent(false);
private static ManualResetEvent sendDone = new ManualResetEvent(false);
public void connect(string ip, int port)
{
Console.WriteLine("開始連接" + ip);
_ip = ip;
_port = port;
this.client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
this.client.SendTimeout = 1000;
this.client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.NoDelay, true);
IPEndPoint serverEndPoint = new IPEndPoint(IPAddress.Parse(ip), port);
this.client.BeginConnect(serverEndPoint, new AsyncCallback(ConnectCallback), client);
if (connectDone.WaitOne(5 * 1000))
{
connectDone.Set();
}
else
{
connectDone.Reset();
}
}
public void myTimer_Elapsed(object sender, System.Timers.ElapsedEventArgs e)
{
try
{
byte[] sbuf = new byte[16];
sbuf[0] = 0x00;
sbuf[1] = 0x5A;
sbuf[2] = 0x41;
sbuf[3] = 0x00;
sbuf[4] = 0x07;
sbuf[5] = 0x00;
sbuf[6] = 0x00;
sbuf[7] = 0x00;
sbuf[8] = 0x00;
sbuf[9] = 0x00;
sbuf[10] = 0x00;
sbuf[11] = 0x00;
sbuf[12] = 0x00;
sbuf[13] = 0x00;
sbuf[14] = 0x00;
//SerialPort serialPort = new SerialPort("10002", 9600);
int j = 0;
for (int i = 0; i < 8; i++)
{
j += sbuf[i];
}
sbuf[15] = (byte)j;
if (this.client == null || this.client.Connected == false)
this.connect(this._ip, _port);
IAsyncResult result = this.client.BeginSend(sbuf, 0, sbuf.Length, 0, new AsyncCallback(SendCallback), this.client);
sendDone.WaitOne(5 * 1000);
sendDone.Reset();
}
catch (Exception f)
{
logger.Info("發送Socket請求");
logger.Info(f.Message);
logger.Info(f.InnerException);
}
}
private static void SendCallback(IAsyncResult ar)
{
try
{
Socket client1 = (Socket)ar.AsyncState;
client1.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.SendTimeout, 10000);
int bytesSent = client1.EndSend(ar);
sendDone.Set();
string recStr = "";
byte[] recBytes = new byte[16];
int bytes = client1.Receive(recBytes, recBytes.Length, 0);
for (int i = 0; i < recBytes.Length; i++)
{
recStr += Convert.ToString(recBytes[i], 16) + " ";
}
RabbitMQSend(client1.RemoteEndPoint.ToString().Split(":")[0], recStr);
}
catch (SocketException ex)
{
//超時的錯誤代碼-10060
if (ex.ErrorCode == 10060)
{
logger.Debug("超時錯誤");
}
}
catch (Exception e)
{
sendDone.Set();
logger.Debug("發送完畢結束回傳值");
logger.Debug(e.Message);
Console.WriteLine(e.Message);
}
}
private static void ConnectCallback(IAsyncResult ar)
{
try
{
Socket client2 = (Socket)ar.AsyncState;
if (client2 == null)
client2.EndConnect(ar);
client2.IOControl(IOControlCode.KeepAliveValues, KeepAlive(1, 1000, 1000), null);
}
catch (SocketException se)
{
logger.Info("不能連接到服務器");
logger.Info(se.Message);
Console.WriteLine("不能連接到服務器:" + se.Message);
}
finally
{
connectDone.Set();
}
}
public static void RabbitMQSend(string ipAddress, string msg)
{
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/61295.html
標籤:單片機/工控
上一篇:大神來看看!遇到瓶頸了!
下一篇:紅包搭建怎么做,小白求教
